risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_stats::TableStatsMap;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
31use risingwave_pb::hummock::compact_task::PbTaskType;
32use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
33use tokio::time::Instant;
34
35pub use super::context::CompactorContext;
36use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
37use crate::hummock::compactor::{
38    ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
39};
40use crate::hummock::iterator::{
41    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
42    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
43    UserIterator,
44};
45use crate::hummock::multi_builder::TableBuilderFactory;
46use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
47use crate::hummock::{
48    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
49    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
50};
51use crate::monitor::StoreLocalStatistic;
52
53pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
54    pub object_id_getter: Arc<dyn GetObjectId>,
55    pub limiter: Arc<MemoryLimiter>,
56    pub options: SstableBuilderOptions,
57    pub policy: CachePolicy,
58    pub remote_rpc_cost: Arc<AtomicU64>,
59    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
60    pub sstable_writer_factory: W,
61    pub _phantom: PhantomData<F>,
62}
63
64#[async_trait::async_trait]
65impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
66    type Filter = F;
67    type Writer = W::Writer;
68
69    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
70        let timer = Instant::now();
71        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
72        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
73        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
74        let writer_options = SstableWriterOptions {
75            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
76            tracker: None,
77            policy: self.policy,
78        };
79        let writer = self
80            .sstable_writer_factory
81            .create_sst_writer(table_id, writer_options)
82            .await?;
83        let builder = SstableBuilder::new(
84            table_id,
85            writer,
86            Self::Filter::create(
87                self.options.bloom_false_positive,
88                self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
89            ),
90            self.options.clone(),
91            self.compaction_catalog_agent_ref.clone(),
92            Some(self.limiter.clone()),
93        );
94        Ok(builder)
95    }
96}
97
98/// `CompactionStatistics` will count the results of each compact split
99#[derive(Default, Debug)]
100pub struct CompactionStatistics {
101    // to report per-table metrics
102    pub delta_drop_stat: TableStatsMap,
103
104    // to calculate delete ratio
105    pub iter_total_key_counts: u64,
106    pub iter_drop_key_counts: u64,
107}
108
109impl CompactionStatistics {
110    #[expect(dead_code)]
111    fn delete_ratio(&self) -> Option<u64> {
112        if self.iter_total_key_counts == 0 {
113            return None;
114        }
115
116        Some(self.iter_drop_key_counts / self.iter_total_key_counts)
117    }
118}
119
120#[derive(Clone, Default)]
121pub struct TaskConfig {
122    pub(crate) key_range: KeyRange,
123    pub(crate) cache_policy: CachePolicy,
124    pub(crate) gc_delete_keys: bool,
125    pub(crate) retain_multiple_version: bool,
126    pub(crate) use_block_based_filter: bool,
127
128    pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
129    /// `TableId` -> `TableSchema`
130    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
131    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
132    pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
133    /// `disable_drop_column_optimization` should only be set in benchmark.
134    pub(crate) disable_drop_column_optimization: bool,
135}
136
137impl TaskConfig {
138    #[cfg(any(test, feature = "test"))]
139    pub fn for_test(
140        key_range: KeyRange,
141        cache_policy: CachePolicy,
142        gc_delete_keys: bool,
143        use_block_based_filter: bool,
144        table_schemas: HashMap<TableId, PbTableSchema>,
145    ) -> Self {
146        Self {
147            key_range,
148            cache_policy,
149            gc_delete_keys,
150            retain_multiple_version: false,
151            use_block_based_filter,
152            table_vnode_partition: BTreeMap::default(),
153            table_schemas,
154            disable_drop_column_optimization: false,
155        }
156    }
157
158    #[cfg(any(test, feature = "test"))]
159    pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
160        self.disable_drop_column_optimization = disable;
161        self
162    }
163}
164
165pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
166    let mut multi_filter = MultiCompactionFilter::default();
167    let compaction_filter_flag =
168        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
169    // STATE_CLEAN is handled by compact-task table-id normalization and iterator-level block
170    // pruning. Keep the flag as a no-op here for compatibility with existing task configs.
171
172    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
173        let id_to_ttl = compact_task
174            .table_options
175            .iter()
176            .filter_map(|(id, option)| {
177                option
178                    .retention_seconds
179                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
180            })
181            .collect();
182
183        let ttl_filter = Box::new(TtlCompactionFilter::new(
184            id_to_ttl,
185            compact_task.current_epoch_time,
186        ));
187        multi_filter.register(ttl_filter);
188    }
189
190    multi_filter
191}
192
193fn generate_splits_fast(
194    sstable_infos: &[&SstableInfo],
195    compaction_size: u64,
196    context: &CompactorContext,
197    max_sub_compaction: u32,
198) -> Vec<KeyRange> {
199    let worker_num = context.compaction_executor.worker_num();
200    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
201
202    let parallelism = calculate_task_parallelism_impl(
203        worker_num,
204        parallel_compact_size,
205        compaction_size,
206        max_sub_compaction,
207    );
208    let mut indexes = vec![];
209    for sst in sstable_infos {
210        let key_range = &sst.key_range;
211        indexes.push(
212            FullKey {
213                user_key: FullKey::decode(&key_range.left).user_key,
214                epoch_with_gap: EpochWithGap::new_max_epoch(),
215            }
216            .encode(),
217        );
218        indexes.push(
219            FullKey {
220                user_key: FullKey::decode(&key_range.right).user_key,
221                epoch_with_gap: EpochWithGap::new_max_epoch(),
222            }
223            .encode(),
224        );
225    }
226    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
227    indexes.dedup();
228    if indexes.len() <= parallelism {
229        return vec![];
230    }
231
232    let mut splits = vec![];
233    splits.push(KeyRange::default());
234    let parallel_key_count = indexes.len() / parallelism;
235    let mut last_split_key_count = 0;
236    for key in indexes {
237        if last_split_key_count >= parallel_key_count {
238            splits.last_mut().unwrap().right = Bytes::from(key.clone());
239            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
240            last_split_key_count = 0;
241        }
242        last_split_key_count += 1;
243    }
244
245    splits
246}
247
248pub async fn generate_splits(
249    sstable_infos: &[&SstableInfo],
250    compaction_size: u64,
251    context: &CompactorContext,
252    max_sub_compaction: u32,
253) -> HummockResult<Vec<KeyRange>> {
254    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
255    if compaction_size > parallel_compact_size {
256        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
257            return Ok(generate_splits_fast(
258                sstable_infos,
259                compaction_size,
260                context,
261                max_sub_compaction,
262            ));
263        }
264        let mut indexes = vec![];
265        // preload the meta and get the smallest key to split sub_compaction
266        for sstable_info in sstable_infos {
267            let sstable = context
268                .sstable_store
269                .sstable(sstable_info, &mut StoreLocalStatistic::default())
270                .await?;
271            indexes.extend(sstable.meta.block_metas.iter().map(|block| {
272                let data_size = block.len;
273                let full_key = FullKey {
274                    user_key: FullKey::decode(&block.smallest_key).user_key,
275                    epoch_with_gap: EpochWithGap::new_max_epoch(),
276                }
277                .encode();
278                (data_size as u64, full_key)
279            }));
280        }
281        // sort by key, as for every data block has the same size;
282        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
283        let mut splits = vec![];
284        splits.push(KeyRange::default());
285
286        let parallelism = calculate_task_parallelism_impl(
287            context.compaction_executor.worker_num(),
288            parallel_compact_size,
289            compaction_size,
290            max_sub_compaction,
291        );
292
293        let sub_compaction_data_size =
294            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
295
296        if parallelism > 1 {
297            let mut last_buffer_size = 0;
298            let mut last_key: Vec<u8> = vec![];
299            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
300            for (data_size, key) in indexes {
301                if last_buffer_size >= sub_compaction_data_size
302                    && !last_key.eq(&key)
303                    && remaining_size > parallel_compact_size
304                {
305                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
306                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
307                    last_buffer_size = data_size;
308                } else {
309                    last_buffer_size += data_size;
310                }
311                remaining_size -= data_size;
312                last_key = key;
313            }
314            return Ok(splits);
315        }
316    }
317
318    Ok(vec![])
319}
320
321pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
322    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
323    let total_input_uncompressed_file_size = task
324        .read_input_ssts()
325        .map(|table| table.uncompressed_file_size)
326        .sum::<u64>();
327
328    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
329    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
330}
331
332/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
333pub async fn check_compaction_result(
334    compact_task: &CompactTask,
335    context: CompactorContext,
336    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
337) -> HummockResult<bool> {
338    // This check method does not consider dropped keys by compaction filter.
339    if compact_task.contains_ttl() {
340        return Ok(true);
341    }
342
343    let mut table_iters = Vec::new();
344
345    for level in &compact_task.input_ssts {
346        if level.level_type == PbLevelType::Nonoverlapping {
347            let tables = level.read_sstable_infos().cloned().collect_vec();
348            if tables.is_empty() {
349                continue;
350            }
351            debug_assert!(can_concat(&tables));
352
353            table_iters.push(ConcatSstableIterator::new(
354                tables,
355                KeyRange::inf(),
356                context.sstable_store.clone(),
357                Arc::new(TaskProgress::default()),
358                context.storage_opts.compactor_iter_max_io_retry_times,
359            ));
360        } else {
361            for table_info in level.read_sstable_infos().cloned() {
362                table_iters.push(ConcatSstableIterator::new(
363                    vec![table_info],
364                    KeyRange::inf(),
365                    context.sstable_store.clone(),
366                    Arc::new(TaskProgress::default()),
367                    context.storage_opts.compactor_iter_max_io_retry_times,
368                ));
369            }
370        }
371    }
372
373    let iter = MergeIterator::for_compactor(table_iters);
374    let left_iter = {
375        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
376            iter,
377            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
378                compact_task.pk_prefix_table_watermarks.clone(),
379            ),
380        );
381
382        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
383            skip_watermark_iter,
384            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
385                compact_task.non_pk_prefix_table_watermarks.clone(),
386                compaction_catalog_agent_ref.clone(),
387            ),
388        );
389
390        UserIterator::new(
391            combine_iter,
392            (Bound::Unbounded, Bound::Unbounded),
393            u64::MAX,
394            0,
395            None,
396        )
397    };
398    let iter = ConcatSstableIterator::new(
399        compact_task.sorted_output_ssts.clone(),
400        KeyRange::inf(),
401        context.sstable_store.clone(),
402        Arc::new(TaskProgress::default()),
403        context.storage_opts.compactor_iter_max_io_retry_times,
404    );
405    let right_iter = {
406        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
407            iter,
408            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
409                compact_task.pk_prefix_table_watermarks.clone(),
410            ),
411        );
412
413        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
414            skip_watermark_iter,
415            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
416                compact_task.non_pk_prefix_table_watermarks.clone(),
417                compaction_catalog_agent_ref,
418            ),
419        );
420
421        UserIterator::new(
422            combine_iter,
423            (Bound::Unbounded, Bound::Unbounded),
424            u64::MAX,
425            0,
426            None,
427        )
428    };
429
430    check_result(left_iter, right_iter).await
431}
432
433pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
434    left_iter: UserIterator<I>,
435    sort_ssts: Vec<SstableInfo>,
436    context: CompactorContext,
437) -> HummockResult<bool> {
438    let iter = ConcatSstableIterator::new(
439        sort_ssts,
440        KeyRange::inf(),
441        context.sstable_store.clone(),
442        Arc::new(TaskProgress::default()),
443        0,
444    );
445    let right_iter = UserIterator::new(
446        iter,
447        (Bound::Unbounded, Bound::Unbounded),
448        u64::MAX,
449        0,
450        None,
451    );
452    check_result(left_iter, right_iter).await
453}
454
455async fn check_result<
456    I1: HummockIterator<Direction = Forward>,
457    I2: HummockIterator<Direction = Forward>,
458>(
459    mut left_iter: UserIterator<I1>,
460    mut right_iter: UserIterator<I2>,
461) -> HummockResult<bool> {
462    left_iter.rewind().await?;
463    right_iter.rewind().await?;
464    let mut right_count = 0;
465    let mut left_count = 0;
466    while left_iter.is_valid() && right_iter.is_valid() {
467        if left_iter.key() != right_iter.key() {
468            tracing::error!(
469                "The key of input and output not equal. key: {:?} vs {:?}",
470                left_iter.key(),
471                right_iter.key()
472            );
473            return Ok(false);
474        }
475        if left_iter.value() != right_iter.value() {
476            tracing::error!(
477                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
478                left_iter.key(),
479                left_iter.value(),
480                right_iter.value()
481            );
482            return Ok(false);
483        }
484        left_iter.next().await?;
485        right_iter.next().await?;
486        left_count += 1;
487        right_count += 1;
488    }
489    while left_iter.is_valid() {
490        left_count += 1;
491        left_iter.next().await?;
492    }
493    while right_iter.is_valid() {
494        right_count += 1;
495        right_iter.next().await?;
496    }
497    if left_count != right_count {
498        tracing::error!(
499            "The key count of input and output not equal: {} vs {}",
500            left_count,
501            right_count
502        );
503        return Ok(false);
504    }
505    Ok(true)
506}
507
508pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
509    let input_ssts = compact_task.read_input_ssts().collect_vec();
510    let compaction_size = input_ssts_size(&input_ssts);
511    optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
512}
513
514fn optimize_by_copy_block_with_input(
515    compact_task: &CompactTask,
516    context: &CompactorContext,
517    input_ssts: &[&SstableInfo],
518    compaction_size: u64,
519) -> bool {
520    let all_ssts_are_blocked_filter = input_ssts
521        .iter()
522        .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
523
524    let delete_key_count = input_ssts
525        .iter()
526        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
527        .sum::<u64>();
528    let total_key_count = input_ssts
529        .iter()
530        .map(|table_info| table_info.total_key_count)
531        .sum::<u64>();
532
533    let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
534    context.storage_opts.enable_fast_compaction
535        && all_ssts_are_blocked_filter
536        && !compact_task.contains_range_tombstone()
537        && !compact_task.contains_ttl()
538        && !compact_task.contains_split_sst()
539        && single_table
540        && compact_task.target_level > 0
541        && compact_task.input_ssts.len() == 2
542        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
543        && delete_key_count * 100
544            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
545        && compact_task.task_type == PbTaskType::Dynamic
546}
547
548pub async fn generate_splits_for_task(
549    compact_task: &mut CompactTask,
550    context: &CompactorContext,
551    optimize_by_copy_block: bool,
552) -> HummockResult<()> {
553    let input_ssts = compact_task.read_input_ssts().collect_vec();
554    let compaction_size = input_ssts_size(&input_ssts);
555
556    if !optimize_by_copy_block {
557        let splits = generate_splits(
558            &input_ssts,
559            compaction_size,
560            context,
561            compact_task.max_sub_compaction,
562        )
563        .await?;
564        if !splits.is_empty() {
565            compact_task.splits = splits;
566        }
567        return Ok(());
568    }
569
570    Ok(())
571}
572
573pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
574    let group_label = compact_task.compaction_group_id.to_string();
575    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
576
577    let (select_size, select_count) = read_sstable_size_and_count(
578        compact_task
579            .input_ssts
580            .iter()
581            .filter(|level| level.level_idx != compact_task.target_level)
582            .flat_map(|level| level.read_sstable_infos()),
583    );
584    let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
585        compact_task
586            .input_ssts
587            .iter()
588            .filter(|level| level.level_idx == compact_task.target_level)
589            .flat_map(|level| level.read_sstable_infos()),
590    );
591
592    context
593        .compactor_metrics
594        .compact_read_current_level
595        .with_label_values(&[&group_label, &cur_level_label])
596        .inc_by(select_size);
597    context
598        .compactor_metrics
599        .compact_read_sstn_current_level
600        .with_label_values(&[&group_label, &cur_level_label])
601        .inc_by(select_count as u64);
602
603    let next_level_label = compact_task.target_level.to_string();
604    context
605        .compactor_metrics
606        .compact_read_next_level
607        .with_label_values(&[&group_label, &next_level_label])
608        .inc_by(target_level_read_bytes);
609    context
610        .compactor_metrics
611        .compact_read_sstn_next_level
612        .with_label_values(&[&group_label, &next_level_label])
613        .inc_by(target_count as u64);
614}
615
616fn read_sstable_size_and_count<'a>(
617    sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
618) -> (u64, usize) {
619    sstable_infos
620        .into_iter()
621        .fold((0, 0), |(size, count), table_info| {
622            (size + table_info.sst_size, count + 1)
623        })
624}
625
626pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
627    let input_ssts = compact_task.read_input_ssts().collect_vec();
628    let compaction_size = input_ssts_size(&input_ssts);
629    let optimize_by_copy_block =
630        optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
631
632    if optimize_by_copy_block {
633        return 1;
634    }
635
636    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
637    calculate_task_parallelism_impl(
638        context.compaction_executor.worker_num(),
639        parallel_compact_size,
640        compaction_size,
641        compact_task.max_sub_compaction,
642    )
643}
644
645fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
646    input_ssts
647        .iter()
648        .map(|table_info| table_info.sst_size)
649        .sum()
650}
651
652pub fn calculate_task_parallelism_impl(
653    worker_num: usize,
654    parallel_compact_size: u64,
655    compaction_size: u64,
656    max_sub_compaction: u32,
657) -> usize {
658    let parallelism = compaction_size.div_ceil(parallel_compact_size);
659    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
660}