risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39    RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43    ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51    TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55/// Iterates over the KV-pairs of an SST while downloading it.
56pub struct BlockStreamIterator {
57    /// The downloading stream.
58    block_stream: Option<BlockDataStream>,
59
60    next_block_index: usize,
61
62    /// For key sanity check of divided SST and debugging
63    sstable: TableHolder,
64    iter: Option<BlockIterator>,
65    task_progress: Arc<TaskProgress>,
66
67    // For block stream recreate
68    sstable_store: SstableStoreRef,
69    sstable_info: SstableInfo,
70    io_retry_times: usize,
71    max_io_retry_times: usize,
72    stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76    // We have to handle two internal iterators.
77    //   `block_stream`: iterates over the blocks of the table.
78    //     `block_iter`: iterates over the KV-pairs of the current block.
79    // These iterators work in different ways.
80
81    // BlockIterator works as follows: After new(), we call seek(). That brings us
82    // to the first element. Calling next() then brings us to the second element and does not
83    // return anything.
84
85    // BlockStream follows a different approach. After new(), we do not seek, instead next()
86    // returns the first value.
87
88    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
89    /// The iterator reads at most `max_block_count` from the stream.
90    pub fn new(
91        sstable: TableHolder,
92        task_progress: Arc<TaskProgress>,
93        sstable_store: SstableStoreRef,
94        sstable_info: SstableInfo,
95        max_io_retry_times: usize,
96        stats_ptr: Arc<AtomicU64>,
97    ) -> Self {
98        Self {
99            block_stream: None,
100            next_block_index: 0,
101            sstable,
102            iter: None,
103            task_progress,
104            sstable_store,
105            sstable_info,
106            io_retry_times: 0,
107            max_io_retry_times,
108            stats_ptr,
109        }
110    }
111
112    async fn create_stream(&mut self) -> HummockResult<()> {
113        // Fast compact only support the single table compaction.(not split sst)
114        // So we don't need to filter the block_metas with table_id and key_range
115        let block_stream = self
116            .sstable_store
117            .get_stream_for_blocks(
118                self.sstable_info.object_id,
119                &self.sstable.meta.block_metas[self.next_block_index..],
120            )
121            .instrument_await("stream_iter_get_stream".verbose())
122            .await?;
123        self.block_stream = Some(block_stream);
124        Ok(())
125    }
126
127    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
128    pub(crate) async fn download_next_block(
129        &mut self,
130    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131        let now = Instant::now();
132        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135        });
136        loop {
137            let ret = match &mut self.block_stream {
138                Some(block_stream) => block_stream.next_block_impl().await,
139                None => {
140                    self.create_stream().await?;
141                    continue;
142                }
143            };
144            match ret {
145                Ok(Some((data, _))) => {
146                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147                    let filter_block = self
148                        .sstable
149                        .filter_reader
150                        .get_block_raw_filter(self.next_block_index);
151                    self.next_block_index += 1;
152                    return Ok(Some((data, filter_block, meta)));
153                }
154
155                Ok(None) => break,
156
157                Err(e) => {
158                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159                        return Err(e);
160                    }
161
162                    self.block_stream.take();
163                    self.io_retry_times += 1;
164                    fail_point!("create_stream_err");
165
166                    tracing::warn!(
167                        "fast compact retry create stream for sstable {} times, sstinfo={}",
168                        self.io_retry_times,
169                        format!(
170                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171                            self.sstable_info.object_id,
172                            self.sstable_info.sst_id,
173                            self.sstable_info.meta_offset,
174                            self.sstable_info.table_ids
175                        )
176                    );
177                }
178            }
179        }
180
181        self.next_block_index = self.sstable.meta.block_metas.len();
182        self.iter.take();
183        Ok(None)
184    }
185
186    pub(crate) fn init_block_iter(
187        &mut self,
188        buf: Bytes,
189        uncompressed_capacity: usize,
190    ) -> HummockResult<()> {
191        let block = Block::decode(buf, uncompressed_capacity)?;
192        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193        iter.seek_to_first();
194        self.iter = Some(iter);
195        Ok(())
196    }
197
198    fn next_block_smallest(&self) -> &[u8] {
199        self.sstable.meta.block_metas[self.next_block_index]
200            .smallest_key
201            .as_ref()
202    }
203
204    fn next_block_largest(&self) -> &[u8] {
205        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206            self.sstable.meta.block_metas[self.next_block_index + 1]
207                .smallest_key
208                .as_ref()
209        } else {
210            self.sstable.meta.largest_key.as_ref()
211        }
212    }
213
214    fn current_block_largest(&self) -> Vec<u8> {
215        if self.next_block_index < self.sstable.meta.block_metas.len() {
216            let mut largest_key = FullKey::decode(
217                self.sstable.meta.block_metas[self.next_block_index]
218                    .smallest_key
219                    .as_ref(),
220            );
221            // do not include this key because it is the smallest key of next block.
222            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223            largest_key.encode()
224        } else {
225            self.sstable.meta.largest_key.clone()
226        }
227    }
228
229    fn key(&self) -> FullKey<&[u8]> {
230        match self.iter.as_ref() {
231            Some(iter) => iter.key(),
232            None => FullKey::decode(
233                self.sstable.meta.block_metas[self.next_block_index]
234                    .smallest_key
235                    .as_ref(),
236            ),
237        }
238    }
239
240    pub(crate) fn is_valid(&self) -> bool {
241        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242    }
243
244    #[cfg(test)]
245    #[cfg(feature = "failpoints")]
246    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247        self.iter.as_mut().unwrap()
248    }
249}
250
251impl Drop for BlockStreamIterator {
252    fn drop(&mut self) {
253        self.task_progress.dec_num_pending_read_io();
254    }
255}
256
257/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
258/// be consecutive and non-overlapping.
259pub struct ConcatSstableIterator {
260    /// The iterator of the current table.
261    sstable_iter: Option<BlockStreamIterator>,
262
263    /// Current table index.
264    cur_idx: usize,
265
266    /// All non-overlapping tables.
267    sstables: Vec<SstableInfo>,
268
269    sstable_store: SstableStoreRef,
270
271    stats: StoreLocalStatistic,
272    task_progress: Arc<TaskProgress>,
273
274    max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278    /// Caller should make sure that `tables` are non-overlapping,
279    /// arranged in ascending order when it serves as a forward iterator,
280    /// and arranged in descending order when it serves as a backward iterator.
281    pub fn new(
282        sst_infos: Vec<SstableInfo>,
283        sstable_store: SstableStoreRef,
284        task_progress: Arc<TaskProgress>,
285        max_io_retry_times: usize,
286    ) -> Self {
287        Self {
288            sstable_iter: None,
289            cur_idx: 0,
290            sstables: sst_infos,
291            sstable_store,
292            task_progress,
293            stats: StoreLocalStatistic::default(),
294            max_io_retry_times,
295        }
296    }
297
298    pub async fn rewind(&mut self) -> HummockResult<()> {
299        self.seek_idx(0).await
300    }
301
302    pub async fn next_sstable(&mut self) -> HummockResult<()> {
303        self.seek_idx(self.cur_idx + 1).await
304    }
305
306    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307        self.sstable_iter.as_mut().unwrap()
308    }
309
310    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311        if let Some(sstable) = self.sstable_iter.as_mut() {
312            if sstable.iter.is_some() {
313                return Ok(());
314            }
315            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317        }
318        Ok(())
319    }
320
321    pub fn is_valid(&self) -> bool {
322        self.cur_idx < self.sstables.len()
323    }
324
325    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
326    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327        self.sstable_iter.take();
328        self.cur_idx = idx;
329        if self.cur_idx < self.sstables.len() {
330            let sstable_info = &self.sstables[self.cur_idx];
331            let sstable = self
332                .sstable_store
333                .sstable(sstable_info, &mut self.stats)
334                .instrument_await("stream_iter_sstable".verbose())
335                .await?;
336            self.task_progress.inc_num_pending_read_io();
337
338            let sstable_iter = BlockStreamIterator::new(
339                sstable,
340                self.task_progress.clone(),
341                self.sstable_store.clone(),
342                sstable_info.clone(),
343                self.max_io_retry_times,
344                self.stats.remote_io_time.clone(),
345            );
346            self.sstable_iter = Some(sstable_iter);
347        }
348        Ok(())
349    }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353    left: Box<ConcatSstableIterator>,
354    right: Box<ConcatSstableIterator>,
355    task_id: u64,
356    executor: CompactTaskExecutor<
357        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358        C,
359    >,
360    compression_algorithm: CompressionAlgorithm,
361    metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365    pub fn new(
366        context: CompactorContext,
367        task: CompactTask,
368        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369        object_id_getter: Arc<dyn GetObjectId>,
370        task_progress: Arc<TaskProgress>,
371        compaction_filter: C,
372    ) -> Self {
373        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375        options.compression_algorithm = compression_algorithm;
376        options.capacity = task.target_file_size as usize;
377        let get_id_time = Arc::new(AtomicU64::new(0));
378
379        let key_range = KeyRange::inf();
380
381        let task_config = TaskConfig {
382            key_range,
383            cache_policy: CachePolicy::NotFill,
384            gc_delete_keys: task.gc_delete_keys,
385            retain_multiple_version: false,
386            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
387            task_type: task.task_type,
388            table_vnode_partition: task.table_vnode_partition.clone(),
389            use_block_based_filter: true,
390            table_schemas: Default::default(),
391            disable_drop_column_optimization: false,
392        };
393        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
394
395        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
396            object_id_getter,
397            limiter: context.memory_limiter.clone(),
398            options,
399            policy: task_config.cache_policy,
400            remote_rpc_cost: get_id_time,
401            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
402            sstable_writer_factory: factory,
403            _phantom: PhantomData,
404        };
405        let sst_builder = CapacitySplitTableBuilder::new(
406            builder_factory,
407            context.compactor_metrics.clone(),
408            Some(task_progress.clone()),
409            task_config.table_vnode_partition.clone(),
410            context
411                .storage_opts
412                .compactor_concurrent_uploading_sst_count,
413            compaction_catalog_agent_ref.clone(),
414        );
415        assert_eq!(
416            task.input_ssts.len(),
417            2,
418            "TaskId {} target_level {:?} task {:?}",
419            task.task_id,
420            task.target_level,
421            compact_task_to_string(&task)
422        );
423        let left = Box::new(ConcatSstableIterator::new(
424            task.input_ssts[0].table_infos.clone(),
425            context.sstable_store.clone(),
426            task_progress.clone(),
427            context.storage_opts.compactor_iter_max_io_retry_times,
428        ));
429        let right = Box::new(ConcatSstableIterator::new(
430            task.input_ssts[1].table_infos.clone(),
431            context.sstable_store,
432            task_progress.clone(),
433            context.storage_opts.compactor_iter_max_io_retry_times,
434        ));
435
436        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
437        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
438            task.pk_prefix_table_watermarks.clone(),
439        );
440        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
441            task.non_pk_prefix_table_watermarks.clone(),
442            compaction_catalog_agent_ref.clone(),
443        );
444        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
445            task.value_table_watermarks.clone(),
446            compaction_catalog_agent_ref,
447        );
448
449        Self {
450            executor: CompactTaskExecutor::new(
451                sst_builder,
452                task_config,
453                task_progress,
454                pk_prefix_state,
455                non_pk_prefix_state,
456                value_skip_watermark_state,
457                compaction_filter,
458            ),
459            left,
460            right,
461            task_id: task.task_id,
462            metrics: context.compactor_metrics,
463            compression_algorithm,
464        }
465    }
466
467    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
468        self.left.rewind().await?;
469        self.right.rewind().await?;
470        let mut skip_raw_block_count = 0;
471        let mut skip_raw_block_size = 0;
472        while self.left.is_valid() && self.right.is_valid() {
473            let ret = self
474                .left
475                .current_sstable()
476                .key()
477                .cmp(&self.right.current_sstable().key());
478            let (first, second) = if ret == Ordering::Less {
479                (&mut self.left, &mut self.right)
480            } else {
481                (&mut self.right, &mut self.left)
482            };
483            assert!(
484                ret != Ordering::Equal,
485                "sst range overlap equal_key {:?}",
486                self.left.current_sstable().key()
487            );
488            if first.current_sstable().iter.is_none() {
489                let right_key = second.current_sstable().key();
490                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
491                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
492                    // the full key may be either Excluded key or Included key, so we do not allow
493                    // they equals.
494                    if full_key.user_key.ge(&right_key.user_key) {
495                        break;
496                    }
497                    let smallest_key =
498                        FullKey::decode(first.current_sstable().next_block_smallest());
499                    if !self.executor.shall_copy_raw_block(&smallest_key) {
500                        break;
501                    }
502                    let smallest_key = smallest_key.to_vec();
503
504                    let (mut block, filter_data, mut meta) = first
505                        .current_sstable()
506                        .download_next_block()
507                        .await?
508                        .unwrap();
509                    let algorithm = Block::get_algorithm(&block)?;
510                    if algorithm == CompressionAlgorithm::None
511                        && algorithm != self.compression_algorithm
512                    {
513                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
514                        meta.len = block.len() as u32;
515                    }
516
517                    let largest_key = first.current_sstable().current_block_largest();
518                    let block_len = block.len() as u64;
519                    let block_key_count = meta.total_key_count;
520
521                    if self
522                        .executor
523                        .builder
524                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
525                        .await?
526                    {
527                        skip_raw_block_size += block_len;
528                        skip_raw_block_count += 1;
529                    }
530                    self.executor.may_report_process_key(block_key_count);
531                    self.executor.clear();
532                }
533                if !first.current_sstable().is_valid() {
534                    first.next_sstable().await?;
535                    continue;
536                }
537                first.init_block_iter().await?;
538            }
539
540            let target_key = second.current_sstable().key();
541            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
542            self.executor.run(iter, target_key).await?;
543            if !iter.is_valid() {
544                first.sstable_iter.as_mut().unwrap().iter.take();
545                if !first.current_sstable().is_valid() {
546                    first.next_sstable().await?;
547                }
548            }
549        }
550        let rest_data = if !self.left.is_valid() {
551            &mut self.right
552        } else {
553            &mut self.left
554        };
555        if rest_data.is_valid() {
556            // compact rest keys of the current block.
557            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
558            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
559            if let Some(iter) = sstable_iter.iter.as_mut() {
560                self.executor.run(iter, target_key).await?;
561                assert!(
562                    !iter.is_valid(),
563                    "iter should not be valid key {:?}",
564                    iter.key()
565                );
566            }
567            sstable_iter.iter.take();
568        }
569
570        while rest_data.is_valid() {
571            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
572            while sstable_iter.is_valid() {
573                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
574                let (block, filter_data, block_meta) =
575                    sstable_iter.download_next_block().await?.unwrap();
576                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
577                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
578                    && self.executor.last_key_is_delete;
579                if self.executor.builder.need_flush()
580                    || need_deleted
581                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
582                {
583                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
584                    let target_key = FullKey::decode(&largest_key);
585                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
586                    let mut iter = sstable_iter.iter.take().unwrap();
587                    self.executor.run(&mut iter, target_key).await?;
588                } else {
589                    let largest_key = sstable_iter.current_block_largest();
590                    let block_len = block.len() as u64;
591                    let block_key_count = block_meta.total_key_count;
592                    if self
593                        .executor
594                        .builder
595                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
596                        .await?
597                    {
598                        skip_raw_block_count += 1;
599                        skip_raw_block_size += block_len;
600                    }
601                    self.executor.may_report_process_key(block_key_count);
602                    self.executor.clear();
603                }
604            }
605            rest_data.next_sstable().await?;
606        }
607        let mut total_read_bytes = 0;
608        for sst in &self.left.sstables {
609            total_read_bytes += sst.sst_size;
610        }
611        for sst in &self.right.sstables {
612            total_read_bytes += sst.sst_size;
613        }
614        self.metrics
615            .compact_fast_runner_bytes
616            .inc_by(skip_raw_block_size);
617        tracing::info!(
618            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
619            skip_raw_block_count,
620            self.task_id,
621            skip_raw_block_size * 100 / total_read_bytes,
622        );
623
624        let statistic = self.executor.take_statistics();
625        let output_ssts = self.executor.builder.finish().await?;
626        Compactor::report_progress(
627            self.metrics.clone(),
628            Some(self.executor.task_progress.clone()),
629            &output_ssts,
630            false,
631        );
632        let sst_infos = output_ssts
633            .iter()
634            .map(|sst| sst.sst_info.clone())
635            .collect_vec();
636        assert!(can_concat(&sst_infos));
637        Ok((output_ssts, statistic))
638    }
639}
640
641pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
642    last_key: FullKey<Vec<u8>>,
643    compaction_statistics: CompactionStatistics,
644    last_table_id: Option<TableId>,
645    last_table_stats: TableStats,
646    builder: CapacitySplitTableBuilder<F>,
647    task_config: TaskConfig,
648    task_progress: Arc<TaskProgress>,
649    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
650    last_key_is_delete: bool,
651    progress_key_num: u32,
652    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
653    value_skip_watermark_state: ValueSkipWatermarkState,
654    compaction_filter: C,
655}
656
657impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
658    pub fn new(
659        builder: CapacitySplitTableBuilder<F>,
660        task_config: TaskConfig,
661        task_progress: Arc<TaskProgress>,
662        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
663        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
664        value_skip_watermark_state: ValueSkipWatermarkState,
665        compaction_filter: C,
666    ) -> Self {
667        Self {
668            builder,
669            task_config,
670            last_key: FullKey::default(),
671            last_key_is_delete: false,
672            compaction_statistics: CompactionStatistics::default(),
673            last_table_id: None,
674            last_table_stats: TableStats::default(),
675            task_progress,
676            pk_prefix_skip_watermark_state,
677            progress_key_num: 0,
678            non_pk_prefix_skip_watermark_state,
679            value_skip_watermark_state,
680            compaction_filter,
681        }
682    }
683
684    fn take_statistics(&mut self) -> CompactionStatistics {
685        if let Some(last_table_id) = self.last_table_id.take() {
686            self.compaction_statistics
687                .delta_drop_stat
688                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
689        }
690        std::mem::take(&mut self.compaction_statistics)
691    }
692
693    fn clear(&mut self) {
694        if !self.last_key.is_empty() {
695            self.last_key = FullKey::default();
696        }
697        self.last_key_is_delete = false;
698    }
699
700    #[inline(always)]
701    fn may_report_process_key(&mut self, key_count: u32) {
702        const PROGRESS_KEY_INTERVAL: u32 = 100;
703        self.progress_key_num += key_count;
704        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
705            self.task_progress
706                .inc_progress_key(self.progress_key_num as u64);
707            self.progress_key_num = 0;
708        }
709    }
710
711    pub async fn run(
712        &mut self,
713        iter: &mut BlockIterator,
714        target_key: FullKey<&[u8]>,
715    ) -> HummockResult<()> {
716        self.pk_prefix_skip_watermark_state.reset_watermark();
717        self.non_pk_prefix_skip_watermark_state.reset_watermark();
718
719        while iter.is_valid() && iter.key().le(&target_key) {
720            let is_new_user_key =
721                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
722            self.compaction_statistics.iter_total_key_counts += 1;
723            self.may_report_process_key(1);
724
725            let mut drop = false;
726            let value = HummockValue::from_slice(iter.value()).unwrap();
727            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
728            if is_first_or_new_user_key {
729                self.last_key.set(iter.key());
730                self.last_key_is_delete = false;
731            }
732
733            // See note in `compactor_runner.rs`.
734            if !self.task_config.retain_multiple_version
735                && self.task_config.gc_delete_keys
736                && value.is_delete()
737            {
738                drop = true;
739                self.last_key_is_delete = true;
740            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
741                drop = true;
742            }
743
744            if !drop && self.compaction_filter.should_delete(iter.key()) {
745                drop = true;
746            }
747
748            if !drop && self.watermark_should_delete(&iter.key(), value) {
749                drop = true;
750                self.last_key_is_delete = true;
751            }
752
753            if self.last_table_id != Some(self.last_key.user_key.table_id) {
754                if let Some(last_table_id) = self.last_table_id.take() {
755                    self.compaction_statistics
756                        .delta_drop_stat
757                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
758                }
759                self.last_table_id = Some(self.last_key.user_key.table_id);
760            }
761
762            if drop {
763                self.compaction_statistics.iter_drop_key_counts += 1;
764
765                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
766                    Some(target_table_ids) => {
767                        target_table_ids.contains(&self.last_key.user_key.table_id)
768                    }
769                    None => true,
770                };
771                if should_count {
772                    self.last_table_stats.total_key_count -= 1;
773                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
774                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
775                }
776                iter.next();
777                continue;
778            }
779            self.builder
780                .add_full_key(iter.key(), value, is_new_user_key)
781                .await?;
782            iter.next();
783        }
784        Ok(())
785    }
786
787    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
788        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
789            // If the last key is delete tombstone, we can not append the origin block
790            // because it would cause a deleted key could be see by user again.
791            return false;
792        }
793
794        if self.watermark_may_delete(smallest_key) {
795            return false;
796        }
797
798        // Check compaction filter
799        if self.compaction_filter.should_delete(*smallest_key) {
800            return false;
801        }
802
803        true
804    }
805
806    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
807        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
808        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
809        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
810        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
811            let unused = vec![];
812            let unused_put = HummockValue::Put(unused.as_slice());
813            if (pk_prefix_has_watermark
814                && self
815                    .pk_prefix_skip_watermark_state
816                    .should_delete(key, unused_put))
817                || (non_pk_prefix_has_watermark
818                    && self
819                        .non_pk_prefix_skip_watermark_state
820                        .should_delete(key, unused_put))
821            {
822                return true;
823            }
824        }
825        self.value_skip_watermark_state.has_watermark()
826            && self.value_skip_watermark_state.may_delete(key)
827    }
828
829    fn watermark_should_delete(
830        &mut self,
831        key: &FullKey<&[u8]>,
832        value: HummockValue<&[u8]>,
833    ) -> bool {
834        (self.pk_prefix_skip_watermark_state.has_watermark()
835            && self
836                .pk_prefix_skip_watermark_state
837                .should_delete(key, value))
838            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
839                && self
840                    .non_pk_prefix_skip_watermark_state
841                    .should_delete(key, value))
842            || (self.value_skip_watermark_state.has_watermark()
843                && self.value_skip_watermark_state.should_delete(key, value))
844    }
845}