risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39    RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43    ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51    TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55/// Iterates over the KV-pairs of an SST while downloading it.
56pub struct BlockStreamIterator {
57    /// The downloading stream.
58    block_stream: Option<BlockDataStream>,
59
60    next_block_index: usize,
61
62    /// For key sanity check of divided SST and debugging
63    sstable: TableHolder,
64    iter: Option<BlockIterator>,
65    task_progress: Arc<TaskProgress>,
66
67    // For block stream recreate
68    sstable_store: SstableStoreRef,
69    sstable_info: SstableInfo,
70    io_retry_times: usize,
71    max_io_retry_times: usize,
72    stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76    // We have to handle two internal iterators.
77    //   `block_stream`: iterates over the blocks of the table.
78    //     `block_iter`: iterates over the KV-pairs of the current block.
79    // These iterators work in different ways.
80
81    // BlockIterator works as follows: After new(), we call seek(). That brings us
82    // to the first element. Calling next() then brings us to the second element and does not
83    // return anything.
84
85    // BlockStream follows a different approach. After new(), we do not seek, instead next()
86    // returns the first value.
87
88    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
89    /// The iterator reads at most `max_block_count` from the stream.
90    pub fn new(
91        sstable: TableHolder,
92        task_progress: Arc<TaskProgress>,
93        sstable_store: SstableStoreRef,
94        sstable_info: SstableInfo,
95        max_io_retry_times: usize,
96        stats_ptr: Arc<AtomicU64>,
97    ) -> Self {
98        Self {
99            block_stream: None,
100            next_block_index: 0,
101            sstable,
102            iter: None,
103            task_progress,
104            sstable_store,
105            sstable_info,
106            io_retry_times: 0,
107            max_io_retry_times,
108            stats_ptr,
109        }
110    }
111
112    async fn create_stream(&mut self) -> HummockResult<()> {
113        // Fast compact only support the single table compaction.(not split sst)
114        // So we don't need to filter the block_metas with table_id and key_range
115        let block_stream = self
116            .sstable_store
117            .get_stream_for_blocks(
118                self.sstable_info.object_id,
119                &self.sstable.meta.block_metas[self.next_block_index..],
120            )
121            .instrument_await("stream_iter_get_stream".verbose())
122            .await?;
123        self.block_stream = Some(block_stream);
124        Ok(())
125    }
126
127    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
128    pub(crate) async fn download_next_block(
129        &mut self,
130    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131        let now = Instant::now();
132        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135        });
136        loop {
137            let ret = match &mut self.block_stream {
138                Some(block_stream) => block_stream.next_block_impl().await,
139                None => {
140                    self.create_stream().await?;
141                    continue;
142                }
143            };
144            match ret {
145                Ok(Some((data, _))) => {
146                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147                    let filter_block = self
148                        .sstable
149                        .filter_reader
150                        .get_block_raw_filter(self.next_block_index);
151                    self.next_block_index += 1;
152                    return Ok(Some((data, filter_block, meta)));
153                }
154
155                Ok(None) => break,
156
157                Err(e) => {
158                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159                        return Err(e);
160                    }
161
162                    self.block_stream.take();
163                    self.io_retry_times += 1;
164                    fail_point!("create_stream_err");
165
166                    tracing::warn!(
167                        "fast compact retry create stream for sstable {} times, sstinfo={}",
168                        self.io_retry_times,
169                        format!(
170                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171                            self.sstable_info.object_id,
172                            self.sstable_info.sst_id,
173                            self.sstable_info.meta_offset,
174                            self.sstable_info.table_ids
175                        )
176                    );
177                }
178            }
179        }
180
181        self.next_block_index = self.sstable.meta.block_metas.len();
182        self.iter.take();
183        Ok(None)
184    }
185
186    pub(crate) fn init_block_iter(
187        &mut self,
188        buf: Bytes,
189        uncompressed_capacity: usize,
190    ) -> HummockResult<()> {
191        let block = Block::decode(buf, uncompressed_capacity)?;
192        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193        iter.seek_to_first();
194        self.iter = Some(iter);
195        Ok(())
196    }
197
198    fn next_block_smallest(&self) -> &[u8] {
199        self.sstable.meta.block_metas[self.next_block_index]
200            .smallest_key
201            .as_ref()
202    }
203
204    fn next_block_largest(&self) -> &[u8] {
205        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206            self.sstable.meta.block_metas[self.next_block_index + 1]
207                .smallest_key
208                .as_ref()
209        } else {
210            self.sstable.meta.largest_key.as_ref()
211        }
212    }
213
214    fn current_block_largest(&self) -> Vec<u8> {
215        if self.next_block_index < self.sstable.meta.block_metas.len() {
216            let mut largest_key = FullKey::decode(
217                self.sstable.meta.block_metas[self.next_block_index]
218                    .smallest_key
219                    .as_ref(),
220            );
221            // do not include this key because it is the smallest key of next block.
222            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223            largest_key.encode()
224        } else {
225            self.sstable.meta.largest_key.clone()
226        }
227    }
228
229    fn key(&self) -> FullKey<&[u8]> {
230        match self.iter.as_ref() {
231            Some(iter) => iter.key(),
232            None => FullKey::decode(
233                self.sstable.meta.block_metas[self.next_block_index]
234                    .smallest_key
235                    .as_ref(),
236            ),
237        }
238    }
239
240    pub(crate) fn is_valid(&self) -> bool {
241        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242    }
243
244    #[cfg(test)]
245    #[cfg(feature = "failpoints")]
246    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247        self.iter.as_mut().unwrap()
248    }
249}
250
251impl Drop for BlockStreamIterator {
252    fn drop(&mut self) {
253        self.task_progress.dec_num_pending_read_io();
254    }
255}
256
257/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
258/// be consecutive and non-overlapping.
259pub struct ConcatSstableIterator {
260    /// The iterator of the current table.
261    sstable_iter: Option<BlockStreamIterator>,
262
263    /// Current table index.
264    cur_idx: usize,
265
266    /// All non-overlapping tables.
267    sstables: Vec<SstableInfo>,
268
269    sstable_store: SstableStoreRef,
270
271    stats: StoreLocalStatistic,
272    task_progress: Arc<TaskProgress>,
273
274    max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278    /// Caller should make sure that `tables` are non-overlapping,
279    /// arranged in ascending order when it serves as a forward iterator,
280    /// and arranged in descending order when it serves as a backward iterator.
281    pub fn new(
282        sst_infos: Vec<SstableInfo>,
283        sstable_store: SstableStoreRef,
284        task_progress: Arc<TaskProgress>,
285        max_io_retry_times: usize,
286    ) -> Self {
287        Self {
288            sstable_iter: None,
289            cur_idx: 0,
290            sstables: sst_infos,
291            sstable_store,
292            task_progress,
293            stats: StoreLocalStatistic::default(),
294            max_io_retry_times,
295        }
296    }
297
298    pub async fn rewind(&mut self) -> HummockResult<()> {
299        self.seek_idx(0).await
300    }
301
302    pub async fn next_sstable(&mut self) -> HummockResult<()> {
303        self.seek_idx(self.cur_idx + 1).await
304    }
305
306    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307        self.sstable_iter.as_mut().unwrap()
308    }
309
310    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311        if let Some(sstable) = self.sstable_iter.as_mut() {
312            if sstable.iter.is_some() {
313                return Ok(());
314            }
315            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317        }
318        Ok(())
319    }
320
321    pub fn is_valid(&self) -> bool {
322        self.cur_idx < self.sstables.len()
323    }
324
325    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
326    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327        self.sstable_iter.take();
328        self.cur_idx = idx;
329        if self.cur_idx < self.sstables.len() {
330            let sstable_info = &self.sstables[self.cur_idx];
331            let sstable = self
332                .sstable_store
333                .sstable(sstable_info, &mut self.stats)
334                .instrument_await("stream_iter_sstable".verbose())
335                .await?;
336            self.task_progress.inc_num_pending_read_io();
337
338            let sstable_iter = BlockStreamIterator::new(
339                sstable,
340                self.task_progress.clone(),
341                self.sstable_store.clone(),
342                sstable_info.clone(),
343                self.max_io_retry_times,
344                self.stats.remote_io_time.clone(),
345            );
346            self.sstable_iter = Some(sstable_iter);
347        }
348        Ok(())
349    }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353    left: Box<ConcatSstableIterator>,
354    right: Box<ConcatSstableIterator>,
355    task_id: u64,
356    executor: CompactTaskExecutor<
357        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358        C,
359    >,
360    compression_algorithm: CompressionAlgorithm,
361    metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365    pub fn new(
366        context: CompactorContext,
367        task: CompactTask,
368        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369        object_id_getter: Arc<dyn GetObjectId>,
370        task_progress: Arc<TaskProgress>,
371        compaction_filter: C,
372    ) -> Self {
373        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375        options.compression_algorithm = compression_algorithm;
376        options.capacity = task.target_file_size as usize;
377        // Disable vnode key-range hints for fast compaction path by default.
378        options.max_vnode_key_range_bytes = None;
379        let get_id_time = Arc::new(AtomicU64::new(0));
380
381        let key_range = KeyRange::inf();
382
383        let task_config = TaskConfig {
384            key_range,
385            cache_policy: CachePolicy::NotFill,
386            gc_delete_keys: task.gc_delete_keys,
387            retain_multiple_version: false,
388            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
389            table_vnode_partition: task.table_vnode_partition.clone(),
390            use_block_based_filter: true,
391            table_schemas: Default::default(),
392            disable_drop_column_optimization: false,
393        };
394        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397            object_id_getter,
398            limiter: context.memory_limiter.clone(),
399            options,
400            policy: task_config.cache_policy,
401            remote_rpc_cost: get_id_time,
402            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403            sstable_writer_factory: factory,
404            _phantom: PhantomData,
405        };
406        let sst_builder = CapacitySplitTableBuilder::new(
407            builder_factory,
408            context.compactor_metrics.clone(),
409            Some(task_progress.clone()),
410            task_config.table_vnode_partition.clone(),
411            context
412                .storage_opts
413                .compactor_concurrent_uploading_sst_count,
414            compaction_catalog_agent_ref.clone(),
415        );
416        assert_eq!(
417            task.input_ssts.len(),
418            2,
419            "TaskId {} target_level {:?} task {:?}",
420            task.task_id,
421            task.target_level,
422            compact_task_to_string(&task)
423        );
424        let left = Box::new(ConcatSstableIterator::new(
425            task.input_ssts[0].table_infos.clone(),
426            context.sstable_store.clone(),
427            task_progress.clone(),
428            context.storage_opts.compactor_iter_max_io_retry_times,
429        ));
430        let right = Box::new(ConcatSstableIterator::new(
431            task.input_ssts[1].table_infos.clone(),
432            context.sstable_store,
433            task_progress.clone(),
434            context.storage_opts.compactor_iter_max_io_retry_times,
435        ));
436
437        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
438        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439            task.pk_prefix_table_watermarks.clone(),
440        );
441        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442            task.non_pk_prefix_table_watermarks.clone(),
443            compaction_catalog_agent_ref.clone(),
444        );
445        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
446            task.value_table_watermarks.clone(),
447            compaction_catalog_agent_ref,
448        );
449
450        let compact_table_ids = HashSet::from_iter(task.build_compact_table_ids());
451
452        Self {
453            executor: CompactTaskExecutor::new(
454                sst_builder,
455                task_config,
456                task_progress,
457                pk_prefix_state,
458                non_pk_prefix_state,
459                value_skip_watermark_state,
460                compaction_filter,
461                compact_table_ids,
462            ),
463            left,
464            right,
465            task_id: task.task_id,
466            metrics: context.compactor_metrics,
467            compression_algorithm,
468        }
469    }
470
471    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
472        self.left.rewind().await?;
473        self.right.rewind().await?;
474        let mut skip_raw_block_count = 0;
475        let mut skip_raw_block_size = 0;
476        while self.left.is_valid() && self.right.is_valid() {
477            let ret = self
478                .left
479                .current_sstable()
480                .key()
481                .cmp(&self.right.current_sstable().key());
482            let (first, second) = if ret == Ordering::Less {
483                (&mut self.left, &mut self.right)
484            } else {
485                (&mut self.right, &mut self.left)
486            };
487            assert!(
488                ret != Ordering::Equal,
489                "sst range overlap equal_key {:?}",
490                self.left.current_sstable().key()
491            );
492            if first.current_sstable().iter.is_none() {
493                let right_key = second.current_sstable().key();
494                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
495                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
496                    // the full key may be either Excluded key or Included key, so we do not allow
497                    // they equals.
498                    if full_key.user_key.ge(&right_key.user_key) {
499                        break;
500                    }
501                    let smallest_key =
502                        FullKey::decode(first.current_sstable().next_block_smallest());
503                    if !self.executor.shall_copy_raw_block(&smallest_key) {
504                        break;
505                    }
506                    let smallest_key = smallest_key.to_vec();
507
508                    let (mut block, filter_data, mut meta) = first
509                        .current_sstable()
510                        .download_next_block()
511                        .await?
512                        .unwrap();
513                    let algorithm = Block::get_algorithm(&block)?;
514                    if algorithm == CompressionAlgorithm::None
515                        && algorithm != self.compression_algorithm
516                    {
517                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
518                        meta.len = block.len() as u32;
519                    }
520
521                    let largest_key = first.current_sstable().current_block_largest();
522                    let block_len = block.len() as u64;
523                    let block_key_count = meta.total_key_count;
524
525                    if self
526                        .executor
527                        .builder
528                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
529                        .await?
530                    {
531                        skip_raw_block_size += block_len;
532                        skip_raw_block_count += 1;
533                    }
534                    self.executor.may_report_process_key(block_key_count);
535                    self.executor.clear();
536                }
537                if !first.current_sstable().is_valid() {
538                    first.next_sstable().await?;
539                    continue;
540                }
541                first.init_block_iter().await?;
542            }
543
544            let target_key = second.current_sstable().key();
545            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
546            self.executor.reset_watermark();
547            self.executor.run(iter, target_key).await?;
548            if !iter.is_valid() {
549                first.sstable_iter.as_mut().unwrap().iter.take();
550                if !first.current_sstable().is_valid() {
551                    first.next_sstable().await?;
552                }
553            }
554        }
555        let rest_data = if !self.left.is_valid() {
556            &mut self.right
557        } else {
558            &mut self.left
559        };
560        if rest_data.is_valid() {
561            // compact rest keys of the current block.
562            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
563            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
564            if let Some(iter) = sstable_iter.iter.as_mut() {
565                self.executor.reset_watermark();
566                self.executor.run(iter, target_key).await?;
567                assert!(
568                    !iter.is_valid(),
569                    "iter should not be valid key {:?}",
570                    iter.key()
571                );
572            }
573            sstable_iter.iter.take();
574        }
575
576        while rest_data.is_valid() {
577            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
578            while sstable_iter.is_valid() {
579                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
580                let (block, filter_data, block_meta) =
581                    sstable_iter.download_next_block().await?.unwrap();
582                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
583                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
584                    && self.executor.last_key_is_delete;
585                if self.executor.builder.need_flush()
586                    || need_deleted
587                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
588                {
589                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
590                    let target_key = FullKey::decode(&largest_key);
591                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
592                    let mut iter = sstable_iter.iter.take().unwrap();
593                    self.executor.reset_watermark();
594                    self.executor.run(&mut iter, target_key).await?;
595                } else {
596                    let largest_key = sstable_iter.current_block_largest();
597                    let block_len = block.len() as u64;
598                    let block_key_count = block_meta.total_key_count;
599                    if self
600                        .executor
601                        .builder
602                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
603                        .await?
604                    {
605                        skip_raw_block_count += 1;
606                        skip_raw_block_size += block_len;
607                    }
608                    self.executor.may_report_process_key(block_key_count);
609                    self.executor.clear();
610                }
611            }
612            rest_data.next_sstable().await?;
613        }
614        let mut total_read_bytes = 0;
615        for sst in &self.left.sstables {
616            total_read_bytes += sst.sst_size;
617        }
618        for sst in &self.right.sstables {
619            total_read_bytes += sst.sst_size;
620        }
621        self.metrics
622            .compact_fast_runner_bytes
623            .inc_by(skip_raw_block_size);
624        tracing::info!(
625            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
626            skip_raw_block_count,
627            self.task_id,
628            skip_raw_block_size * 100 / total_read_bytes,
629        );
630
631        let statistic = self.executor.take_statistics();
632        let output_ssts = self.executor.builder.finish().await?;
633        Compactor::report_progress(
634            self.metrics.clone(),
635            Some(self.executor.task_progress.clone()),
636            &output_ssts,
637            false,
638        );
639        let sst_infos = output_ssts
640            .iter()
641            .map(|sst| sst.sst_info.clone())
642            .collect_vec();
643        assert!(can_concat(&sst_infos));
644        Ok((output_ssts, statistic))
645    }
646}
647
648pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
649    last_key: FullKey<Vec<u8>>,
650    compaction_statistics: CompactionStatistics,
651    last_table_id: Option<TableId>,
652    last_table_stats: TableStats,
653    builder: CapacitySplitTableBuilder<F>,
654    task_config: TaskConfig,
655    task_progress: Arc<TaskProgress>,
656    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
657    last_key_is_delete: bool,
658    progress_key_num: u32,
659    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
660    value_skip_watermark_state: ValueSkipWatermarkState,
661    compaction_filter: C,
662    compact_table_ids: HashSet<TableId>,
663}
664
665impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
666    pub fn new(
667        builder: CapacitySplitTableBuilder<F>,
668        task_config: TaskConfig,
669        task_progress: Arc<TaskProgress>,
670        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
671        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
672        value_skip_watermark_state: ValueSkipWatermarkState,
673        compaction_filter: C,
674        compact_table_ids: HashSet<TableId>,
675    ) -> Self {
676        Self {
677            builder,
678            task_config,
679            last_key: FullKey::default(),
680            last_key_is_delete: false,
681            compaction_statistics: CompactionStatistics::default(),
682            last_table_id: None,
683            last_table_stats: TableStats::default(),
684            task_progress,
685            pk_prefix_skip_watermark_state,
686            progress_key_num: 0,
687            non_pk_prefix_skip_watermark_state,
688            value_skip_watermark_state,
689            compaction_filter,
690            compact_table_ids,
691        }
692    }
693
694    fn take_statistics(&mut self) -> CompactionStatistics {
695        if let Some(last_table_id) = self.last_table_id.take() {
696            self.compaction_statistics
697                .delta_drop_stat
698                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
699        }
700        std::mem::take(&mut self.compaction_statistics)
701    }
702
703    fn clear(&mut self) {
704        if !self.last_key.is_empty() {
705            self.last_key = FullKey::default();
706        }
707        self.last_key_is_delete = false;
708    }
709
710    fn reset_watermark(&mut self) {
711        self.pk_prefix_skip_watermark_state.reset_watermark();
712        self.non_pk_prefix_skip_watermark_state.reset_watermark();
713        self.value_skip_watermark_state.reset_watermark();
714    }
715
716    #[inline(always)]
717    fn should_skip_block(&self, table_id: TableId) -> bool {
718        !self.compact_table_ids.contains(&table_id)
719    }
720
721    #[inline(always)]
722    fn may_report_process_key(&mut self, key_count: u32) {
723        const PROGRESS_KEY_INTERVAL: u32 = 100;
724        self.progress_key_num += key_count;
725        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
726            self.task_progress
727                .inc_progress_key(self.progress_key_num as u64);
728            self.progress_key_num = 0;
729        }
730    }
731
732    pub async fn run(
733        &mut self,
734        iter: &mut BlockIterator,
735        target_key: FullKey<&[u8]>,
736    ) -> HummockResult<()> {
737        if self.should_skip_block(iter.table_id()) {
738            iter.finish_block();
739            return Ok(());
740        }
741
742        while iter.is_valid() && iter.key().le(&target_key) {
743            let is_new_user_key =
744                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
745            self.compaction_statistics.iter_total_key_counts += 1;
746            self.may_report_process_key(1);
747
748            let mut drop = false;
749            let value = HummockValue::from_slice(iter.value()).unwrap();
750            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
751            if is_first_or_new_user_key {
752                self.last_key.set(iter.key());
753                self.last_key_is_delete = false;
754            }
755
756            // See note in `compactor_runner.rs`.
757            if !self.task_config.retain_multiple_version
758                && self.task_config.gc_delete_keys
759                && value.is_delete()
760            {
761                drop = true;
762                self.last_key_is_delete = true;
763            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
764                drop = true;
765            }
766
767            if !drop && self.compaction_filter.should_delete(iter.key()) {
768                drop = true;
769            }
770
771            if !drop && self.watermark_should_delete(&iter.key(), value) {
772                drop = true;
773                self.last_key_is_delete = true;
774            }
775
776            if self.last_table_id != Some(self.last_key.user_key.table_id) {
777                if let Some(last_table_id) = self.last_table_id.take() {
778                    self.compaction_statistics
779                        .delta_drop_stat
780                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
781                }
782                self.last_table_id = Some(self.last_key.user_key.table_id);
783            }
784
785            if drop {
786                self.compaction_statistics.iter_drop_key_counts += 1;
787
788                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
789                    Some(target_table_ids) => {
790                        target_table_ids.contains(&self.last_key.user_key.table_id)
791                    }
792                    None => true,
793                };
794                if should_count {
795                    self.last_table_stats.total_key_count -= 1;
796                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
797                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
798                }
799                iter.next();
800                continue;
801            }
802            self.builder
803                .add_full_key(iter.key(), value, is_new_user_key)
804                .await?;
805            iter.next();
806        }
807        Ok(())
808    }
809
810    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
811        if self.should_skip_block(smallest_key.user_key.table_id) {
812            // If the table id of smallest key is not in compact_table_ids, we can not copy the raw block.
813            return false;
814        }
815
816        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
817            // If the last key is delete tombstone, we can not append the origin block
818            // because it would cause a deleted key could be see by user again.
819            return false;
820        }
821
822        if self.watermark_may_delete(smallest_key) {
823            return false;
824        }
825
826        // Check compaction filter
827        if self.compaction_filter.should_delete(*smallest_key) {
828            return false;
829        }
830
831        true
832    }
833
834    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
835        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
836        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
837        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
838        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
839            let unused = vec![];
840            let unused_put = HummockValue::Put(unused.as_slice());
841            if (pk_prefix_has_watermark
842                && self
843                    .pk_prefix_skip_watermark_state
844                    .should_delete(key, unused_put))
845                || (non_pk_prefix_has_watermark
846                    && self
847                        .non_pk_prefix_skip_watermark_state
848                        .should_delete(key, unused_put))
849            {
850                return true;
851            }
852        }
853        self.value_skip_watermark_state.has_watermark()
854            && self.value_skip_watermark_state.may_delete(key)
855    }
856
857    fn watermark_should_delete(
858        &mut self,
859        key: &FullKey<&[u8]>,
860        value: HummockValue<&[u8]>,
861    ) -> bool {
862        (self.pk_prefix_skip_watermark_state.has_watermark()
863            && self
864                .pk_prefix_skip_watermark_state
865                .should_delete(key, value))
866            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
867                && self
868                    .non_pk_prefix_skip_watermark_state
869                    .should_delete(key, value))
870            || (self.value_skip_watermark_state.has_watermark()
871                && self.value_skip_watermark_state.should_delete(key, value))
872    }
873}