risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::constants::hummock::CompactionFilterFlag;
24use risingwave_hummock_sdk::compact_task::CompactTask;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_stats::TableStatsMap;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
31use risingwave_pb::hummock::compact_task::PbTaskType;
32use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
33use tokio::time::Instant;
34
35pub use super::context::CompactorContext;
36use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
37use crate::hummock::compactor::{
38    ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
39    TtlCompactionFilter,
40};
41use crate::hummock::iterator::{
42    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
43    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
44    UserIterator,
45};
46use crate::hummock::multi_builder::TableBuilderFactory;
47use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
48use crate::hummock::{
49    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
50    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
51};
52use crate::monitor::StoreLocalStatistic;
53
54pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
55    pub object_id_getter: Box<dyn GetObjectId>,
56    pub limiter: Arc<MemoryLimiter>,
57    pub options: SstableBuilderOptions,
58    pub policy: CachePolicy,
59    pub remote_rpc_cost: Arc<AtomicU64>,
60    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
61    pub sstable_writer_factory: W,
62    pub _phantom: PhantomData<F>,
63}
64
65#[async_trait::async_trait]
66impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
67    type Filter = F;
68    type Writer = W::Writer;
69
70    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
71        let timer = Instant::now();
72        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
73        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
74        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
75        let writer_options = SstableWriterOptions {
76            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
77            tracker: None,
78            policy: self.policy,
79        };
80        let writer = self
81            .sstable_writer_factory
82            .create_sst_writer(table_id, writer_options)
83            .await?;
84        let builder = SstableBuilder::new(
85            table_id,
86            writer,
87            Self::Filter::create(
88                self.options.bloom_false_positive,
89                self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
90            ),
91            self.options.clone(),
92            self.compaction_catalog_agent_ref.clone(),
93            Some(self.limiter.clone()),
94        );
95        Ok(builder)
96    }
97}
98
99/// `CompactionStatistics` will count the results of each compact split
100#[derive(Default, Debug)]
101pub struct CompactionStatistics {
102    // to report per-table metrics
103    pub delta_drop_stat: TableStatsMap,
104
105    // to calculate delete ratio
106    pub iter_total_key_counts: u64,
107    pub iter_drop_key_counts: u64,
108}
109
110impl CompactionStatistics {
111    #[allow(dead_code)]
112    fn delete_ratio(&self) -> Option<u64> {
113        if self.iter_total_key_counts == 0 {
114            return None;
115        }
116
117        Some(self.iter_drop_key_counts / self.iter_total_key_counts)
118    }
119}
120
121#[derive(Clone, Default)]
122pub struct TaskConfig {
123    pub key_range: KeyRange,
124    pub cache_policy: CachePolicy,
125    pub gc_delete_keys: bool,
126    pub retain_multiple_version: bool,
127    /// `stats_target_table_ids` decides whether a dropped key should be counted as table stats
128    /// change. For an divided SST as input, a dropped key shouldn't be counted if its table id
129    /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`.
130    pub stats_target_table_ids: Option<HashSet<u32>>,
131    pub task_type: PbTaskType,
132    pub use_block_based_filter: bool,
133
134    pub table_vnode_partition: BTreeMap<u32, u32>,
135    /// `TableId` -> `TableSchema`
136    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
137    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
138    pub table_schemas: HashMap<u32, PbTableSchema>,
139    /// `disable_drop_column_optimization` should only be set in benchmark.
140    pub disable_drop_column_optimization: bool,
141}
142
143pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
144    let mut multi_filter = MultiCompactionFilter::default();
145    let compaction_filter_flag =
146        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
147    if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
148        let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
149            HashSet::from_iter(compact_task.existing_table_ids.clone()),
150        ));
151
152        multi_filter.register(state_clean_up_filter);
153    }
154
155    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
156        let id_to_ttl = compact_task
157            .table_options
158            .iter()
159            .filter_map(|(id, option)| {
160                option
161                    .retention_seconds
162                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
163            })
164            .collect();
165
166        let ttl_filter = Box::new(TtlCompactionFilter::new(
167            id_to_ttl,
168            compact_task.current_epoch_time,
169        ));
170        multi_filter.register(ttl_filter);
171    }
172
173    multi_filter
174}
175
176fn generate_splits_fast(
177    sstable_infos: &Vec<SstableInfo>,
178    compaction_size: u64,
179    context: &CompactorContext,
180    max_sub_compaction: u32,
181) -> Vec<KeyRange> {
182    let worker_num = context.compaction_executor.worker_num();
183    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
184
185    let parallelism = calculate_task_parallelism_impl(
186        worker_num,
187        parallel_compact_size,
188        compaction_size,
189        max_sub_compaction,
190    );
191    let mut indexes = vec![];
192    for sst in sstable_infos {
193        let key_range = &sst.key_range;
194        indexes.push(
195            FullKey {
196                user_key: FullKey::decode(&key_range.left).user_key,
197                epoch_with_gap: EpochWithGap::new_max_epoch(),
198            }
199            .encode(),
200        );
201        indexes.push(
202            FullKey {
203                user_key: FullKey::decode(&key_range.right).user_key,
204                epoch_with_gap: EpochWithGap::new_max_epoch(),
205            }
206            .encode(),
207        );
208    }
209    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
210    indexes.dedup();
211    if indexes.len() <= parallelism {
212        return vec![];
213    }
214
215    let mut splits = vec![];
216    splits.push(KeyRange::default());
217    let parallel_key_count = indexes.len() / parallelism;
218    let mut last_split_key_count = 0;
219    for key in indexes {
220        if last_split_key_count >= parallel_key_count {
221            splits.last_mut().unwrap().right = Bytes::from(key.clone());
222            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
223            last_split_key_count = 0;
224        }
225        last_split_key_count += 1;
226    }
227
228    splits
229}
230
231pub async fn generate_splits(
232    sstable_infos: &Vec<SstableInfo>,
233    compaction_size: u64,
234    context: &CompactorContext,
235    max_sub_compaction: u32,
236) -> HummockResult<Vec<KeyRange>> {
237    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
238    if compaction_size > parallel_compact_size {
239        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
240            return Ok(generate_splits_fast(
241                sstable_infos,
242                compaction_size,
243                context,
244                max_sub_compaction,
245            ));
246        }
247        let mut indexes = vec![];
248        // preload the meta and get the smallest key to split sub_compaction
249        for sstable_info in sstable_infos {
250            indexes.extend(
251                context
252                    .sstable_store
253                    .sstable(sstable_info, &mut StoreLocalStatistic::default())
254                    .await?
255                    .meta
256                    .block_metas
257                    .iter()
258                    .map(|block| {
259                        let data_size = block.len;
260                        let full_key = FullKey {
261                            user_key: FullKey::decode(&block.smallest_key).user_key,
262                            epoch_with_gap: EpochWithGap::new_max_epoch(),
263                        }
264                        .encode();
265                        (data_size as u64, full_key)
266                    })
267                    .collect_vec(),
268            );
269        }
270        // sort by key, as for every data block has the same size;
271        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
272        let mut splits = vec![];
273        splits.push(KeyRange::default());
274
275        let parallelism = calculate_task_parallelism_impl(
276            context.compaction_executor.worker_num(),
277            parallel_compact_size,
278            compaction_size,
279            max_sub_compaction,
280        );
281
282        let sub_compaction_data_size =
283            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
284
285        if parallelism > 1 {
286            let mut last_buffer_size = 0;
287            let mut last_key: Vec<u8> = vec![];
288            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
289            for (data_size, key) in indexes {
290                if last_buffer_size >= sub_compaction_data_size
291                    && !last_key.eq(&key)
292                    && remaining_size > parallel_compact_size
293                {
294                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
295                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
296                    last_buffer_size = data_size;
297                } else {
298                    last_buffer_size += data_size;
299                }
300                remaining_size -= data_size;
301                last_key = key;
302            }
303            return Ok(splits);
304        }
305    }
306
307    Ok(vec![])
308}
309
310pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
311    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
312    let total_input_uncompressed_file_size = task
313        .input_ssts
314        .iter()
315        .flat_map(|level| level.table_infos.iter())
316        .map(|table| table.uncompressed_file_size)
317        .sum::<u64>();
318
319    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
320    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
321}
322
323/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
324pub async fn check_compaction_result(
325    compact_task: &CompactTask,
326    context: CompactorContext,
327    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
328) -> HummockResult<bool> {
329    let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
330    let need_clean_state_table = table_ids_from_input_ssts
331        .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
332    // This check method does not consider dropped keys by compaction filter.
333    if compact_task.contains_ttl() || need_clean_state_table {
334        return Ok(true);
335    }
336
337    let mut table_iters = Vec::new();
338    for level in &compact_task.input_ssts {
339        if level.table_infos.is_empty() {
340            continue;
341        }
342
343        // Do not need to filter the table because manager has done it.
344        if level.level_type == PbLevelType::Nonoverlapping {
345            debug_assert!(can_concat(&level.table_infos));
346
347            table_iters.push(ConcatSstableIterator::new(
348                compact_task.existing_table_ids.clone(),
349                level.table_infos.clone(),
350                KeyRange::inf(),
351                context.sstable_store.clone(),
352                Arc::new(TaskProgress::default()),
353                context.storage_opts.compactor_iter_max_io_retry_times,
354            ));
355        } else {
356            for table_info in &level.table_infos {
357                table_iters.push(ConcatSstableIterator::new(
358                    compact_task.existing_table_ids.clone(),
359                    vec![table_info.clone()],
360                    KeyRange::inf(),
361                    context.sstable_store.clone(),
362                    Arc::new(TaskProgress::default()),
363                    context.storage_opts.compactor_iter_max_io_retry_times,
364                ));
365            }
366        }
367    }
368
369    let iter = MergeIterator::for_compactor(table_iters);
370    let left_iter = {
371        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
372            iter,
373            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
374                compact_task.pk_prefix_table_watermarks.clone(),
375            ),
376        );
377
378        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
379            skip_watermark_iter,
380            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
381                compact_task.non_pk_prefix_table_watermarks.clone(),
382                compaction_catalog_agent_ref.clone(),
383            ),
384        );
385
386        UserIterator::new(
387            combine_iter,
388            (Bound::Unbounded, Bound::Unbounded),
389            u64::MAX,
390            0,
391            None,
392        )
393    };
394    let iter = ConcatSstableIterator::new(
395        compact_task.existing_table_ids.clone(),
396        compact_task.sorted_output_ssts.clone(),
397        KeyRange::inf(),
398        context.sstable_store.clone(),
399        Arc::new(TaskProgress::default()),
400        context.storage_opts.compactor_iter_max_io_retry_times,
401    );
402    let right_iter = {
403        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
404            iter,
405            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
406                compact_task.pk_prefix_table_watermarks.clone(),
407            ),
408        );
409
410        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
411            skip_watermark_iter,
412            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
413                compact_task.non_pk_prefix_table_watermarks.clone(),
414                compaction_catalog_agent_ref,
415            ),
416        );
417
418        UserIterator::new(
419            combine_iter,
420            (Bound::Unbounded, Bound::Unbounded),
421            u64::MAX,
422            0,
423            None,
424        )
425    };
426
427    check_result(left_iter, right_iter).await
428}
429
430pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
431    left_iter: UserIterator<I>,
432    existing_table_ids: Vec<StateTableId>,
433    sort_ssts: Vec<SstableInfo>,
434    context: CompactorContext,
435) -> HummockResult<bool> {
436    let iter = ConcatSstableIterator::new(
437        existing_table_ids.clone(),
438        sort_ssts.clone(),
439        KeyRange::inf(),
440        context.sstable_store.clone(),
441        Arc::new(TaskProgress::default()),
442        0,
443    );
444    let right_iter = UserIterator::new(
445        iter,
446        (Bound::Unbounded, Bound::Unbounded),
447        u64::MAX,
448        0,
449        None,
450    );
451    check_result(left_iter, right_iter).await
452}
453
454async fn check_result<
455    I1: HummockIterator<Direction = Forward>,
456    I2: HummockIterator<Direction = Forward>,
457>(
458    mut left_iter: UserIterator<I1>,
459    mut right_iter: UserIterator<I2>,
460) -> HummockResult<bool> {
461    left_iter.rewind().await?;
462    right_iter.rewind().await?;
463    let mut right_count = 0;
464    let mut left_count = 0;
465    while left_iter.is_valid() && right_iter.is_valid() {
466        if left_iter.key() != right_iter.key() {
467            tracing::error!(
468                "The key of input and output not equal. key: {:?} vs {:?}",
469                left_iter.key(),
470                right_iter.key()
471            );
472            return Ok(false);
473        }
474        if left_iter.value() != right_iter.value() {
475            tracing::error!(
476                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
477                left_iter.key(),
478                left_iter.value(),
479                right_iter.value()
480            );
481            return Ok(false);
482        }
483        left_iter.next().await?;
484        right_iter.next().await?;
485        left_count += 1;
486        right_count += 1;
487    }
488    while left_iter.is_valid() {
489        left_count += 1;
490        left_iter.next().await?;
491    }
492    while right_iter.is_valid() {
493        right_count += 1;
494        right_iter.next().await?;
495    }
496    if left_count != right_count {
497        tracing::error!(
498            "The key count of input and output not equal: {} vs {}",
499            left_count,
500            right_count
501        );
502        return Ok(false);
503    }
504    Ok(true)
505}
506
507pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
508    let sstable_infos = compact_task
509        .input_ssts
510        .iter()
511        .flat_map(|level| level.table_infos.iter())
512        .filter(|table_info| {
513            let table_ids = &table_info.table_ids;
514            table_ids
515                .iter()
516                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
517        })
518        .cloned()
519        .collect_vec();
520    let compaction_size = sstable_infos
521        .iter()
522        .map(|table_info| table_info.sst_size)
523        .sum::<u64>();
524
525    let all_ssts_are_blocked_filter = sstable_infos
526        .iter()
527        .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
528
529    let delete_key_count = sstable_infos
530        .iter()
531        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
532        .sum::<u64>();
533    let total_key_count = sstable_infos
534        .iter()
535        .map(|table_info| table_info.total_key_count)
536        .sum::<u64>();
537
538    let single_table = compact_task.build_compact_table_ids().len() == 1;
539    context.storage_opts.enable_fast_compaction
540        && all_ssts_are_blocked_filter
541        && !compact_task.contains_range_tombstone()
542        && !compact_task.contains_ttl()
543        && !compact_task.contains_split_sst()
544        && single_table
545        && compact_task.target_level > 0
546        && compact_task.input_ssts.len() == 2
547        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
548        && delete_key_count * 100
549            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
550        && compact_task.task_type == PbTaskType::Dynamic
551}
552
553pub async fn generate_splits_for_task(
554    compact_task: &mut CompactTask,
555    context: &CompactorContext,
556    optimize_by_copy_block: bool,
557) -> HummockResult<()> {
558    let sstable_infos = compact_task
559        .input_ssts
560        .iter()
561        .flat_map(|level| level.table_infos.iter())
562        .filter(|table_info| {
563            let table_ids = &table_info.table_ids;
564            table_ids
565                .iter()
566                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
567        })
568        .cloned()
569        .collect_vec();
570    let compaction_size = sstable_infos
571        .iter()
572        .map(|table_info| table_info.sst_size)
573        .sum::<u64>();
574
575    if !optimize_by_copy_block {
576        let splits = generate_splits(
577            &sstable_infos,
578            compaction_size,
579            context,
580            compact_task.max_sub_compaction,
581        )
582        .await?;
583        if !splits.is_empty() {
584            compact_task.splits = splits;
585        }
586        return Ok(());
587    }
588
589    Ok(())
590}
591
592pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
593    let group_label = compact_task.compaction_group_id.to_string();
594    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
595    let select_table_infos = compact_task
596        .input_ssts
597        .iter()
598        .filter(|level| level.level_idx != compact_task.target_level)
599        .flat_map(|level| level.table_infos.iter())
600        .collect_vec();
601    let target_table_infos = compact_task
602        .input_ssts
603        .iter()
604        .filter(|level| level.level_idx == compact_task.target_level)
605        .flat_map(|level| level.table_infos.iter())
606        .collect_vec();
607    let select_size = select_table_infos
608        .iter()
609        .map(|table| table.sst_size)
610        .sum::<u64>();
611    context
612        .compactor_metrics
613        .compact_read_current_level
614        .with_label_values(&[&group_label, &cur_level_label])
615        .inc_by(select_size);
616    context
617        .compactor_metrics
618        .compact_read_sstn_current_level
619        .with_label_values(&[&group_label, &cur_level_label])
620        .inc_by(select_table_infos.len() as u64);
621
622    let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
623    let next_level_label = compact_task.target_level.to_string();
624    context
625        .compactor_metrics
626        .compact_read_next_level
627        .with_label_values(&[&group_label, next_level_label.as_str()])
628        .inc_by(target_level_read_bytes);
629    context
630        .compactor_metrics
631        .compact_read_sstn_next_level
632        .with_label_values(&[&group_label, next_level_label.as_str()])
633        .inc_by(target_table_infos.len() as u64);
634}
635
636pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
637    let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
638
639    if optimize_by_copy_block {
640        return 1;
641    }
642
643    let sstable_infos = compact_task
644        .input_ssts
645        .iter()
646        .flat_map(|level| level.table_infos.iter())
647        .filter(|table_info| {
648            let table_ids = &table_info.table_ids;
649            table_ids
650                .iter()
651                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
652        })
653        .cloned()
654        .collect_vec();
655    let compaction_size = sstable_infos
656        .iter()
657        .map(|table_info| table_info.sst_size)
658        .sum::<u64>();
659    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
660    calculate_task_parallelism_impl(
661        context.compaction_executor.worker_num(),
662        parallel_compact_size,
663        compaction_size,
664        compact_task.max_sub_compaction,
665    )
666}
667
668pub fn calculate_task_parallelism_impl(
669    worker_num: usize,
670    parallel_compact_size: u64,
671    compaction_size: u64,
672    max_sub_compaction: u32,
673) -> usize {
674    let parallelism = compaction_size.div_ceil(parallel_compact_size);
675    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
676}