risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::table_watermark::TableWatermarks;
33use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::task_progress::TaskProgress;
38use crate::hummock::compactor::{
39    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
40    RemoteBuilderFactory, TaskConfig,
41};
42use crate::hummock::iterator::{
43    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
44    ValueSkipWatermarkState,
45};
46use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
47use crate::hummock::sstable_store::SstableStoreRef;
48use crate::hummock::value::HummockValue;
49use crate::hummock::{
50    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
51    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
52    TableHolder, UnifiedSstableWriterFactory,
53};
54use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
55
56/// Iterates over the KV-pairs of an SST while downloading it.
57pub struct BlockStreamIterator {
58    /// The downloading stream.
59    block_stream: Option<BlockDataStream>,
60
61    next_block_index: usize,
62
63    /// For key sanity check of divided SST and debugging
64    sstable: TableHolder,
65    iter: Option<BlockIterator>,
66    task_progress: Arc<TaskProgress>,
67
68    // For block stream recreate
69    sstable_store: SstableStoreRef,
70    sstable_info: SstableInfo,
71    io_retry_times: usize,
72    max_io_retry_times: usize,
73    stats_ptr: Arc<AtomicU64>,
74}
75
76impl BlockStreamIterator {
77    // We have to handle two internal iterators.
78    //   `block_stream`: iterates over the blocks of the table.
79    //     `block_iter`: iterates over the KV-pairs of the current block.
80    // These iterators work in different ways.
81
82    // BlockIterator works as follows: After new(), we call seek(). That brings us
83    // to the first element. Calling next() then brings us to the second element and does not
84    // return anything.
85
86    // BlockStream follows a different approach. After new(), we do not seek, instead next()
87    // returns the first value.
88
89    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
90    /// The iterator reads at most `max_block_count` from the stream.
91    pub fn new(
92        sstable: TableHolder,
93        task_progress: Arc<TaskProgress>,
94        sstable_store: SstableStoreRef,
95        sstable_info: SstableInfo,
96        max_io_retry_times: usize,
97        stats_ptr: Arc<AtomicU64>,
98    ) -> Self {
99        Self {
100            block_stream: None,
101            next_block_index: 0,
102            sstable,
103            iter: None,
104            task_progress,
105            sstable_store,
106            sstable_info,
107            io_retry_times: 0,
108            max_io_retry_times,
109            stats_ptr,
110        }
111    }
112
113    async fn create_stream(&mut self) -> HummockResult<()> {
114        // Fast compact only support the single table compaction.(not split sst)
115        // So we don't need to filter the block_metas with table_id and key_range
116        let block_stream = self
117            .sstable_store
118            .get_stream_for_blocks(
119                self.sstable_info.object_id,
120                &self.sstable.meta.block_metas[self.next_block_index..],
121            )
122            .instrument_await("stream_iter_get_stream".verbose())
123            .await?;
124        self.block_stream = Some(block_stream);
125        Ok(())
126    }
127
128    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
129    pub(crate) async fn download_next_block(
130        &mut self,
131    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
132        let now = Instant::now();
133        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
134            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
135            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
136        });
137        loop {
138            let ret = match &mut self.block_stream {
139                Some(block_stream) => block_stream.next_block_impl().await,
140                None => {
141                    self.create_stream().await?;
142                    continue;
143                }
144            };
145            match ret {
146                Ok(Some((data, _))) => {
147                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
148                    let filter_block = self
149                        .sstable
150                        .filter_reader
151                        .get_block_raw_filter(self.next_block_index);
152                    self.next_block_index += 1;
153                    return Ok(Some((data, filter_block, meta)));
154                }
155
156                Ok(None) => break,
157
158                Err(e) => {
159                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
160                        return Err(e);
161                    }
162
163                    self.block_stream.take();
164                    self.io_retry_times += 1;
165                    fail_point!("create_stream_err");
166
167                    tracing::warn!(
168                        "fast compact retry create stream for sstable {} times, sstinfo={}",
169                        self.io_retry_times,
170                        format!(
171                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
172                            self.sstable_info.object_id,
173                            self.sstable_info.sst_id,
174                            self.sstable_info.meta_offset,
175                            self.sstable_info.table_ids
176                        )
177                    );
178                }
179            }
180        }
181
182        self.next_block_index = self.sstable.meta.block_metas.len();
183        self.iter.take();
184        Ok(None)
185    }
186
187    pub(crate) fn init_block_iter(
188        &mut self,
189        buf: Bytes,
190        uncompressed_capacity: usize,
191    ) -> HummockResult<()> {
192        let block = Block::decode(buf, uncompressed_capacity)?;
193        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
194        iter.seek_to_first();
195        self.iter = Some(iter);
196        Ok(())
197    }
198
199    fn next_block_smallest(&self) -> &[u8] {
200        self.sstable.meta.block_metas[self.next_block_index]
201            .smallest_key
202            .as_ref()
203    }
204
205    fn next_block_largest(&self) -> &[u8] {
206        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
207            self.sstable.meta.block_metas[self.next_block_index + 1]
208                .smallest_key
209                .as_ref()
210        } else {
211            self.sstable.meta.largest_key.as_ref()
212        }
213    }
214
215    fn current_block_largest(&self) -> Vec<u8> {
216        if self.next_block_index < self.sstable.meta.block_metas.len() {
217            let mut largest_key = FullKey::decode(
218                self.sstable.meta.block_metas[self.next_block_index]
219                    .smallest_key
220                    .as_ref(),
221            );
222            // do not include this key because it is the smallest key of next block.
223            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
224            largest_key.encode()
225        } else {
226            self.sstable.meta.largest_key.clone()
227        }
228    }
229
230    fn key(&self) -> FullKey<&[u8]> {
231        match self.iter.as_ref() {
232            Some(iter) => iter.key(),
233            None => FullKey::decode(
234                self.sstable.meta.block_metas[self.next_block_index]
235                    .smallest_key
236                    .as_ref(),
237            ),
238        }
239    }
240
241    pub(crate) fn is_valid(&self) -> bool {
242        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
243    }
244
245    #[cfg(test)]
246    #[cfg(feature = "failpoints")]
247    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
248        self.iter.as_mut().unwrap()
249    }
250}
251
252impl Drop for BlockStreamIterator {
253    fn drop(&mut self) {
254        self.task_progress.dec_num_pending_read_io();
255    }
256}
257
258/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
259/// be consecutive and non-overlapping.
260pub struct ConcatSstableIterator {
261    /// The iterator of the current table.
262    sstable_iter: Option<BlockStreamIterator>,
263
264    /// Current table index.
265    cur_idx: usize,
266
267    /// All non-overlapping tables.
268    sstables: Vec<SstableInfo>,
269
270    sstable_store: SstableStoreRef,
271
272    stats: StoreLocalStatistic,
273    task_progress: Arc<TaskProgress>,
274
275    max_io_retry_times: usize,
276}
277
278impl ConcatSstableIterator {
279    /// Caller should make sure that `tables` are non-overlapping,
280    /// arranged in ascending order when it serves as a forward iterator,
281    /// and arranged in descending order when it serves as a backward iterator.
282    pub fn new(
283        sst_infos: Vec<SstableInfo>,
284        sstable_store: SstableStoreRef,
285        task_progress: Arc<TaskProgress>,
286        max_io_retry_times: usize,
287    ) -> Self {
288        Self {
289            sstable_iter: None,
290            cur_idx: 0,
291            sstables: sst_infos,
292            sstable_store,
293            task_progress,
294            stats: StoreLocalStatistic::default(),
295            max_io_retry_times,
296        }
297    }
298
299    pub async fn rewind(&mut self) -> HummockResult<()> {
300        self.seek_idx(0).await
301    }
302
303    pub async fn next_sstable(&mut self) -> HummockResult<()> {
304        self.seek_idx(self.cur_idx + 1).await
305    }
306
307    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
308        self.sstable_iter.as_mut().unwrap()
309    }
310
311    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
312        if let Some(sstable) = self.sstable_iter.as_mut() {
313            if sstable.iter.is_some() {
314                return Ok(());
315            }
316            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
317            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
318        }
319        Ok(())
320    }
321
322    pub fn is_valid(&self) -> bool {
323        self.cur_idx < self.sstables.len()
324    }
325
326    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
327    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
328        self.sstable_iter.take();
329        self.cur_idx = idx;
330        if self.cur_idx < self.sstables.len() {
331            let sstable_info = &self.sstables[self.cur_idx];
332            let sstable = self
333                .sstable_store
334                .sstable(sstable_info, &mut self.stats)
335                .instrument_await("stream_iter_sstable".verbose())
336                .await?;
337            self.task_progress.inc_num_pending_read_io();
338
339            let sstable_iter = BlockStreamIterator::new(
340                sstable,
341                self.task_progress.clone(),
342                self.sstable_store.clone(),
343                sstable_info.clone(),
344                self.max_io_retry_times,
345                self.stats.remote_io_time.clone(),
346            );
347            self.sstable_iter = Some(sstable_iter);
348        }
349        Ok(())
350    }
351}
352
353pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
354    left: Box<ConcatSstableIterator>,
355    right: Box<ConcatSstableIterator>,
356    task_id: u64,
357    executor: CompactTaskExecutor<
358        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
359        C,
360    >,
361    compression_algorithm: CompressionAlgorithm,
362    metrics: Arc<CompactorMetrics>,
363}
364
365impl<C: CompactionFilter> CompactorRunner<C> {
366    pub fn new(
367        context: CompactorContext,
368        task: CompactTask,
369        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
370        object_id_getter: Arc<dyn GetObjectId>,
371        task_progress: Arc<TaskProgress>,
372        compaction_filter: C,
373    ) -> Self {
374        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
375        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
376        options.compression_algorithm = compression_algorithm;
377        options.capacity = task.target_file_size as usize;
378        let get_id_time = Arc::new(AtomicU64::new(0));
379
380        let key_range = KeyRange::inf();
381
382        let task_config = TaskConfig {
383            key_range,
384            cache_policy: CachePolicy::NotFill,
385            gc_delete_keys: task.gc_delete_keys,
386            retain_multiple_version: false,
387            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
388            task_type: task.task_type,
389            table_vnode_partition: task.table_vnode_partition.clone(),
390            use_block_based_filter: true,
391            table_schemas: Default::default(),
392            disable_drop_column_optimization: false,
393        };
394        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397            object_id_getter,
398            limiter: context.memory_limiter.clone(),
399            options,
400            policy: task_config.cache_policy,
401            remote_rpc_cost: get_id_time,
402            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403            sstable_writer_factory: factory,
404            _phantom: PhantomData,
405        };
406        let sst_builder = CapacitySplitTableBuilder::new(
407            builder_factory,
408            context.compactor_metrics.clone(),
409            Some(task_progress.clone()),
410            task_config.table_vnode_partition.clone(),
411            context
412                .storage_opts
413                .compactor_concurrent_uploading_sst_count,
414            compaction_catalog_agent_ref.clone(),
415        );
416        assert_eq!(
417            task.input_ssts.len(),
418            2,
419            "TaskId {} target_level {:?} task {:?}",
420            task.task_id,
421            task.target_level,
422            compact_task_to_string(&task)
423        );
424        let left = Box::new(ConcatSstableIterator::new(
425            task.input_ssts[0].table_infos.clone(),
426            context.sstable_store.clone(),
427            task_progress.clone(),
428            context.storage_opts.compactor_iter_max_io_retry_times,
429        ));
430        let right = Box::new(ConcatSstableIterator::new(
431            task.input_ssts[1].table_infos.clone(),
432            context.sstable_store,
433            task_progress.clone(),
434            context.storage_opts.compactor_iter_max_io_retry_times,
435        ));
436
437        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
438        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439            task.pk_prefix_table_watermarks.clone(),
440        );
441        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442            task.non_pk_prefix_table_watermarks.clone(),
443            compaction_catalog_agent_ref.clone(),
444        );
445        // TODO: get value_table_watermarks from compaction task
446        let value_table_watermarks: BTreeMap<TableId, TableWatermarks> = BTreeMap::default();
447        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
448            value_table_watermarks,
449            compaction_catalog_agent_ref,
450        );
451
452        Self {
453            executor: CompactTaskExecutor::new(
454                sst_builder,
455                task_config,
456                task_progress,
457                pk_prefix_state,
458                non_pk_prefix_state,
459                value_skip_watermark_state,
460                compaction_filter,
461            ),
462            left,
463            right,
464            task_id: task.task_id,
465            metrics: context.compactor_metrics,
466            compression_algorithm,
467        }
468    }
469
470    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
471        self.left.rewind().await?;
472        self.right.rewind().await?;
473        let mut skip_raw_block_count = 0;
474        let mut skip_raw_block_size = 0;
475        while self.left.is_valid() && self.right.is_valid() {
476            let ret = self
477                .left
478                .current_sstable()
479                .key()
480                .cmp(&self.right.current_sstable().key());
481            let (first, second) = if ret == Ordering::Less {
482                (&mut self.left, &mut self.right)
483            } else {
484                (&mut self.right, &mut self.left)
485            };
486            assert!(
487                ret != Ordering::Equal,
488                "sst range overlap equal_key {:?}",
489                self.left.current_sstable().key()
490            );
491            if first.current_sstable().iter.is_none() {
492                let right_key = second.current_sstable().key();
493                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
494                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
495                    // the full key may be either Excluded key or Included key, so we do not allow
496                    // they equals.
497                    if full_key.user_key.ge(&right_key.user_key) {
498                        break;
499                    }
500                    let smallest_key =
501                        FullKey::decode(first.current_sstable().next_block_smallest());
502                    if !self.executor.shall_copy_raw_block(&smallest_key) {
503                        break;
504                    }
505                    let smallest_key = smallest_key.to_vec();
506
507                    let (mut block, filter_data, mut meta) = first
508                        .current_sstable()
509                        .download_next_block()
510                        .await?
511                        .unwrap();
512                    let algorithm = Block::get_algorithm(&block)?;
513                    if algorithm == CompressionAlgorithm::None
514                        && algorithm != self.compression_algorithm
515                    {
516                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
517                        meta.len = block.len() as u32;
518                    }
519
520                    let largest_key = first.current_sstable().current_block_largest();
521                    let block_len = block.len() as u64;
522                    let block_key_count = meta.total_key_count;
523
524                    if self
525                        .executor
526                        .builder
527                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
528                        .await?
529                    {
530                        skip_raw_block_size += block_len;
531                        skip_raw_block_count += 1;
532                    }
533                    self.executor.may_report_process_key(block_key_count);
534                    self.executor.clear();
535                }
536                if !first.current_sstable().is_valid() {
537                    first.next_sstable().await?;
538                    continue;
539                }
540                first.init_block_iter().await?;
541            }
542
543            let target_key = second.current_sstable().key();
544            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
545            self.executor.run(iter, target_key).await?;
546            if !iter.is_valid() {
547                first.sstable_iter.as_mut().unwrap().iter.take();
548                if !first.current_sstable().is_valid() {
549                    first.next_sstable().await?;
550                }
551            }
552        }
553        let rest_data = if !self.left.is_valid() {
554            &mut self.right
555        } else {
556            &mut self.left
557        };
558        if rest_data.is_valid() {
559            // compact rest keys of the current block.
560            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
561            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
562            if let Some(iter) = sstable_iter.iter.as_mut() {
563                self.executor.run(iter, target_key).await?;
564                assert!(
565                    !iter.is_valid(),
566                    "iter should not be valid key {:?}",
567                    iter.key()
568                );
569            }
570            sstable_iter.iter.take();
571        }
572
573        while rest_data.is_valid() {
574            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
575            while sstable_iter.is_valid() {
576                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
577                let (block, filter_data, block_meta) =
578                    sstable_iter.download_next_block().await?.unwrap();
579                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
580                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
581                    && self.executor.last_key_is_delete;
582                if self.executor.builder.need_flush()
583                    || need_deleted
584                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
585                {
586                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
587                    let target_key = FullKey::decode(&largest_key);
588                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
589                    let mut iter = sstable_iter.iter.take().unwrap();
590                    self.executor.run(&mut iter, target_key).await?;
591                } else {
592                    let largest_key = sstable_iter.current_block_largest();
593                    let block_len = block.len() as u64;
594                    let block_key_count = block_meta.total_key_count;
595                    if self
596                        .executor
597                        .builder
598                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
599                        .await?
600                    {
601                        skip_raw_block_count += 1;
602                        skip_raw_block_size += block_len;
603                    }
604                    self.executor.may_report_process_key(block_key_count);
605                    self.executor.clear();
606                }
607            }
608            rest_data.next_sstable().await?;
609        }
610        let mut total_read_bytes = 0;
611        for sst in &self.left.sstables {
612            total_read_bytes += sst.sst_size;
613        }
614        for sst in &self.right.sstables {
615            total_read_bytes += sst.sst_size;
616        }
617        self.metrics
618            .compact_fast_runner_bytes
619            .inc_by(skip_raw_block_size);
620        tracing::info!(
621            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
622            skip_raw_block_count,
623            self.task_id,
624            skip_raw_block_size * 100 / total_read_bytes,
625        );
626
627        let statistic = self.executor.take_statistics();
628        let output_ssts = self.executor.builder.finish().await?;
629        Compactor::report_progress(
630            self.metrics.clone(),
631            Some(self.executor.task_progress.clone()),
632            &output_ssts,
633            false,
634        );
635        let sst_infos = output_ssts
636            .iter()
637            .map(|sst| sst.sst_info.clone())
638            .collect_vec();
639        assert!(can_concat(&sst_infos));
640        Ok((output_ssts, statistic))
641    }
642}
643
644pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
645    last_key: FullKey<Vec<u8>>,
646    compaction_statistics: CompactionStatistics,
647    last_table_id: Option<TableId>,
648    last_table_stats: TableStats,
649    builder: CapacitySplitTableBuilder<F>,
650    task_config: TaskConfig,
651    task_progress: Arc<TaskProgress>,
652    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
653    last_key_is_delete: bool,
654    progress_key_num: u32,
655    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
656    value_skip_watermark_state: ValueSkipWatermarkState,
657    compaction_filter: C,
658}
659
660impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
661    pub fn new(
662        builder: CapacitySplitTableBuilder<F>,
663        task_config: TaskConfig,
664        task_progress: Arc<TaskProgress>,
665        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
666        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
667        value_skip_watermark_state: ValueSkipWatermarkState,
668        compaction_filter: C,
669    ) -> Self {
670        Self {
671            builder,
672            task_config,
673            last_key: FullKey::default(),
674            last_key_is_delete: false,
675            compaction_statistics: CompactionStatistics::default(),
676            last_table_id: None,
677            last_table_stats: TableStats::default(),
678            task_progress,
679            pk_prefix_skip_watermark_state,
680            progress_key_num: 0,
681            non_pk_prefix_skip_watermark_state,
682            value_skip_watermark_state,
683            compaction_filter,
684        }
685    }
686
687    fn take_statistics(&mut self) -> CompactionStatistics {
688        if let Some(last_table_id) = self.last_table_id.take() {
689            self.compaction_statistics
690                .delta_drop_stat
691                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
692        }
693        std::mem::take(&mut self.compaction_statistics)
694    }
695
696    fn clear(&mut self) {
697        if !self.last_key.is_empty() {
698            self.last_key = FullKey::default();
699        }
700        self.last_key_is_delete = false;
701    }
702
703    #[inline(always)]
704    fn may_report_process_key(&mut self, key_count: u32) {
705        const PROGRESS_KEY_INTERVAL: u32 = 100;
706        self.progress_key_num += key_count;
707        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
708            self.task_progress
709                .inc_progress_key(self.progress_key_num as u64);
710            self.progress_key_num = 0;
711        }
712    }
713
714    pub async fn run(
715        &mut self,
716        iter: &mut BlockIterator,
717        target_key: FullKey<&[u8]>,
718    ) -> HummockResult<()> {
719        self.pk_prefix_skip_watermark_state.reset_watermark();
720        self.non_pk_prefix_skip_watermark_state.reset_watermark();
721
722        while iter.is_valid() && iter.key().le(&target_key) {
723            let is_new_user_key =
724                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
725            self.compaction_statistics.iter_total_key_counts += 1;
726            self.may_report_process_key(1);
727
728            let mut drop = false;
729            let value = HummockValue::from_slice(iter.value()).unwrap();
730            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
731            if is_first_or_new_user_key {
732                self.last_key.set(iter.key());
733                self.last_key_is_delete = false;
734            }
735
736            // See note in `compactor_runner.rs`.
737            if !self.task_config.retain_multiple_version
738                && self.task_config.gc_delete_keys
739                && value.is_delete()
740            {
741                drop = true;
742                self.last_key_is_delete = true;
743            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
744                drop = true;
745            }
746
747            if !drop && self.compaction_filter.should_delete(iter.key()) {
748                drop = true;
749            }
750
751            if !drop && self.watermark_should_delete(&iter.key(), value) {
752                drop = true;
753                self.last_key_is_delete = true;
754            }
755
756            if self.last_table_id != Some(self.last_key.user_key.table_id) {
757                if let Some(last_table_id) = self.last_table_id.take() {
758                    self.compaction_statistics
759                        .delta_drop_stat
760                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
761                }
762                self.last_table_id = Some(self.last_key.user_key.table_id);
763            }
764
765            if drop {
766                self.compaction_statistics.iter_drop_key_counts += 1;
767
768                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
769                    Some(target_table_ids) => {
770                        target_table_ids.contains(&self.last_key.user_key.table_id)
771                    }
772                    None => true,
773                };
774                if should_count {
775                    self.last_table_stats.total_key_count -= 1;
776                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
777                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
778                }
779                iter.next();
780                continue;
781            }
782            self.builder
783                .add_full_key(iter.key(), value, is_new_user_key)
784                .await?;
785            iter.next();
786        }
787        Ok(())
788    }
789
790    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
791        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
792            // If the last key is delete tombstone, we can not append the origin block
793            // because it would cause a deleted key could be see by user again.
794            return false;
795        }
796
797        if self.watermark_may_delete(smallest_key) {
798            return false;
799        }
800
801        // Check compaction filter
802        if self.compaction_filter.should_delete(*smallest_key) {
803            return false;
804        }
805
806        true
807    }
808
809    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
810        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
811        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
812        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
813        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
814            let unused = vec![];
815            let unused_put = HummockValue::Put(unused.as_slice());
816            if (pk_prefix_has_watermark
817                && self
818                    .pk_prefix_skip_watermark_state
819                    .should_delete(key, unused_put))
820                || (non_pk_prefix_has_watermark
821                    && self
822                        .non_pk_prefix_skip_watermark_state
823                        .should_delete(key, unused_put))
824            {
825                return true;
826            }
827        }
828        self.value_skip_watermark_state.has_watermark()
829            && self.value_skip_watermark_state.may_delete(key)
830    }
831
832    fn watermark_should_delete(
833        &mut self,
834        key: &FullKey<&[u8]>,
835        value: HummockValue<&[u8]>,
836    ) -> bool {
837        (self.pk_prefix_skip_watermark_state.has_watermark()
838            && self
839                .pk_prefix_skip_watermark_state
840                .should_delete(key, value))
841            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
842                && self
843                    .non_pk_prefix_skip_watermark_state
844                    .should_delete(key, value))
845            || (self.value_skip_watermark_state.has_watermark()
846                && self.value_skip_watermark_state.should_delete(key, value))
847    }
848}