risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39    RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43    ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51    TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55/// Iterates over the KV-pairs of an SST while downloading it.
56pub struct BlockStreamIterator {
57    /// The downloading stream.
58    block_stream: Option<BlockDataStream>,
59
60    next_block_index: usize,
61
62    /// For key sanity check of divided SST and debugging
63    sstable: TableHolder,
64    iter: Option<BlockIterator>,
65    task_progress: Arc<TaskProgress>,
66
67    // For block stream recreate
68    sstable_store: SstableStoreRef,
69    sstable_info: SstableInfo,
70    io_retry_times: usize,
71    max_io_retry_times: usize,
72    stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76    // We have to handle two internal iterators.
77    //   `block_stream`: iterates over the blocks of the table.
78    //     `block_iter`: iterates over the KV-pairs of the current block.
79    // These iterators work in different ways.
80
81    // BlockIterator works as follows: After new(), we call seek(). That brings us
82    // to the first element. Calling next() then brings us to the second element and does not
83    // return anything.
84
85    // BlockStream follows a different approach. After new(), we do not seek, instead next()
86    // returns the first value.
87
88    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
89    /// The iterator reads at most `max_block_count` from the stream.
90    pub fn new(
91        sstable: TableHolder,
92        task_progress: Arc<TaskProgress>,
93        sstable_store: SstableStoreRef,
94        sstable_info: SstableInfo,
95        max_io_retry_times: usize,
96        stats_ptr: Arc<AtomicU64>,
97    ) -> Self {
98        Self {
99            block_stream: None,
100            next_block_index: 0,
101            sstable,
102            iter: None,
103            task_progress,
104            sstable_store,
105            sstable_info,
106            io_retry_times: 0,
107            max_io_retry_times,
108            stats_ptr,
109        }
110    }
111
112    async fn create_stream(&mut self) -> HummockResult<()> {
113        // Fast compact only support the single table compaction.(not split sst)
114        // So we don't need to filter the block_metas with table_id and key_range
115        let block_stream = self
116            .sstable_store
117            .get_stream_for_blocks(
118                self.sstable_info.object_id,
119                &self.sstable.meta.block_metas[self.next_block_index..],
120            )
121            .instrument_await("stream_iter_get_stream".verbose())
122            .await?;
123        self.block_stream = Some(block_stream);
124        Ok(())
125    }
126
127    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
128    pub(crate) async fn download_next_block(
129        &mut self,
130    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131        let now = Instant::now();
132        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135        });
136        loop {
137            let ret = match &mut self.block_stream {
138                Some(block_stream) => block_stream.next_block_impl().await,
139                None => {
140                    self.create_stream().await?;
141                    continue;
142                }
143            };
144            match ret {
145                Ok(Some((data, _))) => {
146                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147                    let filter_block = self
148                        .sstable
149                        .filter_reader
150                        .get_block_raw_filter(self.next_block_index);
151                    self.next_block_index += 1;
152                    return Ok(Some((data, filter_block, meta)));
153                }
154
155                Ok(None) => break,
156
157                Err(e) => {
158                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159                        return Err(e);
160                    }
161
162                    self.block_stream.take();
163                    self.io_retry_times += 1;
164                    fail_point!("create_stream_err");
165
166                    tracing::warn!(
167                        "fast compact retry create stream for sstable {} times, sstinfo={}",
168                        self.io_retry_times,
169                        format!(
170                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171                            self.sstable_info.object_id,
172                            self.sstable_info.sst_id,
173                            self.sstable_info.meta_offset,
174                            self.sstable_info.table_ids
175                        )
176                    );
177                }
178            }
179        }
180
181        self.next_block_index = self.sstable.meta.block_metas.len();
182        self.iter.take();
183        Ok(None)
184    }
185
186    pub(crate) fn init_block_iter(
187        &mut self,
188        buf: Bytes,
189        uncompressed_capacity: usize,
190    ) -> HummockResult<()> {
191        let block = Block::decode(buf, uncompressed_capacity)?;
192        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193        iter.seek_to_first();
194        self.iter = Some(iter);
195        Ok(())
196    }
197
198    fn next_block_smallest(&self) -> &[u8] {
199        self.sstable.meta.block_metas[self.next_block_index]
200            .smallest_key
201            .as_ref()
202    }
203
204    fn next_block_largest(&self) -> &[u8] {
205        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206            self.sstable.meta.block_metas[self.next_block_index + 1]
207                .smallest_key
208                .as_ref()
209        } else {
210            self.sstable.meta.largest_key.as_ref()
211        }
212    }
213
214    fn current_block_largest(&self) -> Vec<u8> {
215        if self.next_block_index < self.sstable.meta.block_metas.len() {
216            let mut largest_key = FullKey::decode(
217                self.sstable.meta.block_metas[self.next_block_index]
218                    .smallest_key
219                    .as_ref(),
220            );
221            // do not include this key because it is the smallest key of next block.
222            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223            largest_key.encode()
224        } else {
225            self.sstable.meta.largest_key.clone()
226        }
227    }
228
229    fn key(&self) -> FullKey<&[u8]> {
230        match self.iter.as_ref() {
231            Some(iter) => iter.key(),
232            None => FullKey::decode(
233                self.sstable.meta.block_metas[self.next_block_index]
234                    .smallest_key
235                    .as_ref(),
236            ),
237        }
238    }
239
240    pub(crate) fn is_valid(&self) -> bool {
241        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242    }
243
244    #[cfg(test)]
245    #[cfg(feature = "failpoints")]
246    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247        self.iter.as_mut().unwrap()
248    }
249}
250
251impl Drop for BlockStreamIterator {
252    fn drop(&mut self) {
253        self.task_progress.dec_num_pending_read_io();
254    }
255}
256
257/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
258/// be consecutive and non-overlapping.
259pub struct ConcatSstableIterator {
260    /// The iterator of the current table.
261    sstable_iter: Option<BlockStreamIterator>,
262
263    /// Current table index.
264    cur_idx: usize,
265
266    /// All non-overlapping tables.
267    sstables: Vec<SstableInfo>,
268
269    sstable_store: SstableStoreRef,
270
271    stats: StoreLocalStatistic,
272    task_progress: Arc<TaskProgress>,
273
274    max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278    /// Caller should make sure that `tables` are non-overlapping,
279    /// arranged in ascending order when it serves as a forward iterator,
280    /// and arranged in descending order when it serves as a backward iterator.
281    pub fn new(
282        sst_infos: Vec<SstableInfo>,
283        sstable_store: SstableStoreRef,
284        task_progress: Arc<TaskProgress>,
285        max_io_retry_times: usize,
286    ) -> Self {
287        Self {
288            sstable_iter: None,
289            cur_idx: 0,
290            sstables: sst_infos,
291            sstable_store,
292            task_progress,
293            stats: StoreLocalStatistic::default(),
294            max_io_retry_times,
295        }
296    }
297
298    pub async fn rewind(&mut self) -> HummockResult<()> {
299        self.seek_idx(0).await
300    }
301
302    pub async fn next_sstable(&mut self) -> HummockResult<()> {
303        self.seek_idx(self.cur_idx + 1).await
304    }
305
306    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307        self.sstable_iter.as_mut().unwrap()
308    }
309
310    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311        if let Some(sstable) = self.sstable_iter.as_mut() {
312            if sstable.iter.is_some() {
313                return Ok(());
314            }
315            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317        }
318        Ok(())
319    }
320
321    pub fn is_valid(&self) -> bool {
322        self.cur_idx < self.sstables.len()
323    }
324
325    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
326    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327        self.sstable_iter.take();
328        self.cur_idx = idx;
329        if self.cur_idx < self.sstables.len() {
330            let sstable_info = &self.sstables[self.cur_idx];
331            let sstable = self
332                .sstable_store
333                .sstable(sstable_info, &mut self.stats)
334                .instrument_await("stream_iter_sstable".verbose())
335                .await?;
336            self.task_progress.inc_num_pending_read_io();
337
338            let sstable_iter = BlockStreamIterator::new(
339                sstable,
340                self.task_progress.clone(),
341                self.sstable_store.clone(),
342                sstable_info.clone(),
343                self.max_io_retry_times,
344                self.stats.remote_io_time.clone(),
345            );
346            self.sstable_iter = Some(sstable_iter);
347        }
348        Ok(())
349    }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353    left: Box<ConcatSstableIterator>,
354    right: Box<ConcatSstableIterator>,
355    task_id: u64,
356    executor: CompactTaskExecutor<
357        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358        C,
359    >,
360    compression_algorithm: CompressionAlgorithm,
361    metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365    pub fn new(
366        context: CompactorContext,
367        task: CompactTask,
368        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369        object_id_getter: Arc<dyn GetObjectId>,
370        task_progress: Arc<TaskProgress>,
371        compaction_filter: C,
372    ) -> Self {
373        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375        options.compression_algorithm = compression_algorithm;
376        options.capacity = task.target_file_size as usize;
377        // Disable vnode key-range hints for fast compaction path by default.
378        options.max_vnode_key_range_bytes = None;
379        let get_id_time = Arc::new(AtomicU64::new(0));
380
381        let key_range = KeyRange::inf();
382
383        let task_config = TaskConfig {
384            key_range,
385            cache_policy: CachePolicy::NotFill,
386            gc_delete_keys: task.gc_delete_keys,
387            retain_multiple_version: false,
388            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
389            table_vnode_partition: task.table_vnode_partition.clone(),
390            use_block_based_filter: true,
391            table_schemas: Default::default(),
392            disable_drop_column_optimization: false,
393        };
394        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397            object_id_getter,
398            limiter: context.memory_limiter.clone(),
399            options,
400            policy: task_config.cache_policy,
401            remote_rpc_cost: get_id_time,
402            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403            sstable_writer_factory: factory,
404            _phantom: PhantomData,
405        };
406        let sst_builder = CapacitySplitTableBuilder::new(
407            builder_factory,
408            context.compactor_metrics.clone(),
409            Some(task_progress.clone()),
410            task_config.table_vnode_partition.clone(),
411            context
412                .storage_opts
413                .compactor_concurrent_uploading_sst_count,
414            compaction_catalog_agent_ref.clone(),
415        );
416        assert_eq!(
417            task.input_ssts.len(),
418            2,
419            "TaskId {} target_level {:?} task {:?}",
420            task.task_id,
421            task.target_level,
422            compact_task_to_string(&task)
423        );
424        let left = Box::new(ConcatSstableIterator::new(
425            task.input_ssts[0].table_infos.clone(),
426            context.sstable_store.clone(),
427            task_progress.clone(),
428            context.storage_opts.compactor_iter_max_io_retry_times,
429        ));
430        let right = Box::new(ConcatSstableIterator::new(
431            task.input_ssts[1].table_infos.clone(),
432            context.sstable_store,
433            task_progress.clone(),
434            context.storage_opts.compactor_iter_max_io_retry_times,
435        ));
436
437        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
438        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439            task.pk_prefix_table_watermarks.clone(),
440        );
441        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442            task.non_pk_prefix_table_watermarks.clone(),
443            compaction_catalog_agent_ref.clone(),
444        );
445        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
446            task.value_table_watermarks.clone(),
447            compaction_catalog_agent_ref,
448        );
449
450        Self {
451            executor: CompactTaskExecutor::new(
452                sst_builder,
453                task_config,
454                task_progress,
455                pk_prefix_state,
456                non_pk_prefix_state,
457                value_skip_watermark_state,
458                compaction_filter,
459            ),
460            left,
461            right,
462            task_id: task.task_id,
463            metrics: context.compactor_metrics,
464            compression_algorithm,
465        }
466    }
467
468    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
469        self.left.rewind().await?;
470        self.right.rewind().await?;
471        let mut skip_raw_block_count = 0;
472        let mut skip_raw_block_size = 0;
473        while self.left.is_valid() && self.right.is_valid() {
474            let ret = self
475                .left
476                .current_sstable()
477                .key()
478                .cmp(&self.right.current_sstable().key());
479            let (first, second) = if ret == Ordering::Less {
480                (&mut self.left, &mut self.right)
481            } else {
482                (&mut self.right, &mut self.left)
483            };
484            assert!(
485                ret != Ordering::Equal,
486                "sst range overlap equal_key {:?}",
487                self.left.current_sstable().key()
488            );
489            if first.current_sstable().iter.is_none() {
490                let right_key = second.current_sstable().key();
491                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
492                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
493                    // the full key may be either Excluded key or Included key, so we do not allow
494                    // they equals.
495                    if full_key.user_key.ge(&right_key.user_key) {
496                        break;
497                    }
498                    let smallest_key =
499                        FullKey::decode(first.current_sstable().next_block_smallest());
500                    if !self.executor.shall_copy_raw_block(&smallest_key) {
501                        break;
502                    }
503                    let smallest_key = smallest_key.to_vec();
504
505                    let (mut block, filter_data, mut meta) = first
506                        .current_sstable()
507                        .download_next_block()
508                        .await?
509                        .unwrap();
510                    let algorithm = Block::get_algorithm(&block)?;
511                    if algorithm == CompressionAlgorithm::None
512                        && algorithm != self.compression_algorithm
513                    {
514                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
515                        meta.len = block.len() as u32;
516                    }
517
518                    let largest_key = first.current_sstable().current_block_largest();
519                    let block_len = block.len() as u64;
520                    let block_key_count = meta.total_key_count;
521
522                    if self
523                        .executor
524                        .builder
525                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
526                        .await?
527                    {
528                        skip_raw_block_size += block_len;
529                        skip_raw_block_count += 1;
530                    }
531                    self.executor.may_report_process_key(block_key_count);
532                    self.executor.clear();
533                }
534                if !first.current_sstable().is_valid() {
535                    first.next_sstable().await?;
536                    continue;
537                }
538                first.init_block_iter().await?;
539            }
540
541            let target_key = second.current_sstable().key();
542            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
543            self.executor.run(iter, target_key).await?;
544            if !iter.is_valid() {
545                first.sstable_iter.as_mut().unwrap().iter.take();
546                if !first.current_sstable().is_valid() {
547                    first.next_sstable().await?;
548                }
549            }
550        }
551        let rest_data = if !self.left.is_valid() {
552            &mut self.right
553        } else {
554            &mut self.left
555        };
556        if rest_data.is_valid() {
557            // compact rest keys of the current block.
558            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
559            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
560            if let Some(iter) = sstable_iter.iter.as_mut() {
561                self.executor.run(iter, target_key).await?;
562                assert!(
563                    !iter.is_valid(),
564                    "iter should not be valid key {:?}",
565                    iter.key()
566                );
567            }
568            sstable_iter.iter.take();
569        }
570
571        while rest_data.is_valid() {
572            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
573            while sstable_iter.is_valid() {
574                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
575                let (block, filter_data, block_meta) =
576                    sstable_iter.download_next_block().await?.unwrap();
577                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
578                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
579                    && self.executor.last_key_is_delete;
580                if self.executor.builder.need_flush()
581                    || need_deleted
582                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
583                {
584                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
585                    let target_key = FullKey::decode(&largest_key);
586                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
587                    let mut iter = sstable_iter.iter.take().unwrap();
588                    self.executor.run(&mut iter, target_key).await?;
589                } else {
590                    let largest_key = sstable_iter.current_block_largest();
591                    let block_len = block.len() as u64;
592                    let block_key_count = block_meta.total_key_count;
593                    if self
594                        .executor
595                        .builder
596                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
597                        .await?
598                    {
599                        skip_raw_block_count += 1;
600                        skip_raw_block_size += block_len;
601                    }
602                    self.executor.may_report_process_key(block_key_count);
603                    self.executor.clear();
604                }
605            }
606            rest_data.next_sstable().await?;
607        }
608        let mut total_read_bytes = 0;
609        for sst in &self.left.sstables {
610            total_read_bytes += sst.sst_size;
611        }
612        for sst in &self.right.sstables {
613            total_read_bytes += sst.sst_size;
614        }
615        self.metrics
616            .compact_fast_runner_bytes
617            .inc_by(skip_raw_block_size);
618        tracing::info!(
619            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
620            skip_raw_block_count,
621            self.task_id,
622            skip_raw_block_size * 100 / total_read_bytes,
623        );
624
625        let statistic = self.executor.take_statistics();
626        let output_ssts = self.executor.builder.finish().await?;
627        Compactor::report_progress(
628            self.metrics.clone(),
629            Some(self.executor.task_progress.clone()),
630            &output_ssts,
631            false,
632        );
633        let sst_infos = output_ssts
634            .iter()
635            .map(|sst| sst.sst_info.clone())
636            .collect_vec();
637        assert!(can_concat(&sst_infos));
638        Ok((output_ssts, statistic))
639    }
640}
641
642pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
643    last_key: FullKey<Vec<u8>>,
644    compaction_statistics: CompactionStatistics,
645    last_table_id: Option<TableId>,
646    last_table_stats: TableStats,
647    builder: CapacitySplitTableBuilder<F>,
648    task_config: TaskConfig,
649    task_progress: Arc<TaskProgress>,
650    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
651    last_key_is_delete: bool,
652    progress_key_num: u32,
653    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
654    value_skip_watermark_state: ValueSkipWatermarkState,
655    compaction_filter: C,
656}
657
658impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
659    pub fn new(
660        builder: CapacitySplitTableBuilder<F>,
661        task_config: TaskConfig,
662        task_progress: Arc<TaskProgress>,
663        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
664        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
665        value_skip_watermark_state: ValueSkipWatermarkState,
666        compaction_filter: C,
667    ) -> Self {
668        Self {
669            builder,
670            task_config,
671            last_key: FullKey::default(),
672            last_key_is_delete: false,
673            compaction_statistics: CompactionStatistics::default(),
674            last_table_id: None,
675            last_table_stats: TableStats::default(),
676            task_progress,
677            pk_prefix_skip_watermark_state,
678            progress_key_num: 0,
679            non_pk_prefix_skip_watermark_state,
680            value_skip_watermark_state,
681            compaction_filter,
682        }
683    }
684
685    fn take_statistics(&mut self) -> CompactionStatistics {
686        if let Some(last_table_id) = self.last_table_id.take() {
687            self.compaction_statistics
688                .delta_drop_stat
689                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
690        }
691        std::mem::take(&mut self.compaction_statistics)
692    }
693
694    fn clear(&mut self) {
695        if !self.last_key.is_empty() {
696            self.last_key = FullKey::default();
697        }
698        self.last_key_is_delete = false;
699    }
700
701    #[inline(always)]
702    fn may_report_process_key(&mut self, key_count: u32) {
703        const PROGRESS_KEY_INTERVAL: u32 = 100;
704        self.progress_key_num += key_count;
705        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
706            self.task_progress
707                .inc_progress_key(self.progress_key_num as u64);
708            self.progress_key_num = 0;
709        }
710    }
711
712    pub async fn run(
713        &mut self,
714        iter: &mut BlockIterator,
715        target_key: FullKey<&[u8]>,
716    ) -> HummockResult<()> {
717        self.pk_prefix_skip_watermark_state.reset_watermark();
718        self.non_pk_prefix_skip_watermark_state.reset_watermark();
719
720        while iter.is_valid() && iter.key().le(&target_key) {
721            let is_new_user_key =
722                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
723            self.compaction_statistics.iter_total_key_counts += 1;
724            self.may_report_process_key(1);
725
726            let mut drop = false;
727            let value = HummockValue::from_slice(iter.value()).unwrap();
728            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
729            if is_first_or_new_user_key {
730                self.last_key.set(iter.key());
731                self.last_key_is_delete = false;
732            }
733
734            // See note in `compactor_runner.rs`.
735            if !self.task_config.retain_multiple_version
736                && self.task_config.gc_delete_keys
737                && value.is_delete()
738            {
739                drop = true;
740                self.last_key_is_delete = true;
741            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
742                drop = true;
743            }
744
745            if !drop && self.compaction_filter.should_delete(iter.key()) {
746                drop = true;
747            }
748
749            if !drop && self.watermark_should_delete(&iter.key(), value) {
750                drop = true;
751                self.last_key_is_delete = true;
752            }
753
754            if self.last_table_id != Some(self.last_key.user_key.table_id) {
755                if let Some(last_table_id) = self.last_table_id.take() {
756                    self.compaction_statistics
757                        .delta_drop_stat
758                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
759                }
760                self.last_table_id = Some(self.last_key.user_key.table_id);
761            }
762
763            if drop {
764                self.compaction_statistics.iter_drop_key_counts += 1;
765
766                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
767                    Some(target_table_ids) => {
768                        target_table_ids.contains(&self.last_key.user_key.table_id)
769                    }
770                    None => true,
771                };
772                if should_count {
773                    self.last_table_stats.total_key_count -= 1;
774                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
775                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
776                }
777                iter.next();
778                continue;
779            }
780            self.builder
781                .add_full_key(iter.key(), value, is_new_user_key)
782                .await?;
783            iter.next();
784        }
785        Ok(())
786    }
787
788    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
789        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
790            // If the last key is delete tombstone, we can not append the origin block
791            // because it would cause a deleted key could be see by user again.
792            return false;
793        }
794
795        if self.watermark_may_delete(smallest_key) {
796            return false;
797        }
798
799        // Check compaction filter
800        if self.compaction_filter.should_delete(*smallest_key) {
801            return false;
802        }
803
804        true
805    }
806
807    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
808        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
809        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
810        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
811        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
812            let unused = vec![];
813            let unused_put = HummockValue::Put(unused.as_slice());
814            if (pk_prefix_has_watermark
815                && self
816                    .pk_prefix_skip_watermark_state
817                    .should_delete(key, unused_put))
818                || (non_pk_prefix_has_watermark
819                    && self
820                        .non_pk_prefix_skip_watermark_state
821                        .should_delete(key, unused_put))
822            {
823                return true;
824            }
825        }
826        self.value_skip_watermark_state.has_watermark()
827            && self.value_skip_watermark_state.may_delete(key)
828    }
829
830    fn watermark_should_delete(
831        &mut self,
832        key: &FullKey<&[u8]>,
833        value: HummockValue<&[u8]>,
834    ) -> bool {
835        (self.pk_prefix_skip_watermark_state.has_watermark()
836            && self
837                .pk_prefix_skip_watermark_state
838                .should_delete(key, value))
839            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
840                && self
841                    .non_pk_prefix_skip_watermark_state
842                    .should_delete(key, value))
843            || (self.value_skip_watermark_state.has_watermark()
844                && self.value_skip_watermark_state.should_delete(key, value))
845    }
846}