risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39    RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43};
44use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
45use crate::hummock::sstable_store::SstableStoreRef;
46use crate::hummock::value::HummockValue;
47use crate::hummock::{
48    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
49    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
50    TableHolder, UnifiedSstableWriterFactory,
51};
52use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
53
54/// Iterates over the KV-pairs of an SST while downloading it.
55pub struct BlockStreamIterator {
56    /// The downloading stream.
57    block_stream: Option<BlockDataStream>,
58
59    next_block_index: usize,
60
61    /// For key sanity check of divided SST and debugging
62    sstable: TableHolder,
63    iter: Option<BlockIterator>,
64    task_progress: Arc<TaskProgress>,
65
66    // For block stream recreate
67    sstable_store: SstableStoreRef,
68    sstable_info: SstableInfo,
69    io_retry_times: usize,
70    max_io_retry_times: usize,
71    stats_ptr: Arc<AtomicU64>,
72}
73
74impl BlockStreamIterator {
75    // We have to handle two internal iterators.
76    //   `block_stream`: iterates over the blocks of the table.
77    //     `block_iter`: iterates over the KV-pairs of the current block.
78    // These iterators work in different ways.
79
80    // BlockIterator works as follows: After new(), we call seek(). That brings us
81    // to the first element. Calling next() then brings us to the second element and does not
82    // return anything.
83
84    // BlockStream follows a different approach. After new(), we do not seek, instead next()
85    // returns the first value.
86
87    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
88    /// The iterator reads at most `max_block_count` from the stream.
89    pub fn new(
90        sstable: TableHolder,
91        task_progress: Arc<TaskProgress>,
92        sstable_store: SstableStoreRef,
93        sstable_info: SstableInfo,
94        max_io_retry_times: usize,
95        stats_ptr: Arc<AtomicU64>,
96    ) -> Self {
97        Self {
98            block_stream: None,
99            next_block_index: 0,
100            sstable,
101            iter: None,
102            task_progress,
103            sstable_store,
104            sstable_info,
105            io_retry_times: 0,
106            max_io_retry_times,
107            stats_ptr,
108        }
109    }
110
111    async fn create_stream(&mut self) -> HummockResult<()> {
112        // Fast compact only support the single table compaction.(not split sst)
113        // So we don't need to filter the block_metas with table_id and key_range
114        let block_stream = self
115            .sstable_store
116            .get_stream_for_blocks(
117                self.sstable_info.object_id,
118                &self.sstable.meta.block_metas[self.next_block_index..],
119            )
120            .instrument_await("stream_iter_get_stream".verbose())
121            .await?;
122        self.block_stream = Some(block_stream);
123        Ok(())
124    }
125
126    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
127    pub(crate) async fn download_next_block(
128        &mut self,
129    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
130        let now = Instant::now();
131        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
132            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
133            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
134        });
135        loop {
136            let ret = match &mut self.block_stream {
137                Some(block_stream) => block_stream.next_block_impl().await,
138                None => {
139                    self.create_stream().await?;
140                    continue;
141                }
142            };
143            match ret {
144                Ok(Some((data, _))) => {
145                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
146                    let filter_block = self
147                        .sstable
148                        .filter_reader
149                        .get_block_raw_filter(self.next_block_index);
150                    self.next_block_index += 1;
151                    return Ok(Some((data, filter_block, meta)));
152                }
153
154                Ok(None) => break,
155
156                Err(e) => {
157                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
158                        return Err(e);
159                    }
160
161                    self.block_stream.take();
162                    self.io_retry_times += 1;
163                    fail_point!("create_stream_err");
164
165                    tracing::warn!(
166                        "fast compact retry create stream for sstable {} times, sstinfo={}",
167                        self.io_retry_times,
168                        format!(
169                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
170                            self.sstable_info.object_id,
171                            self.sstable_info.sst_id,
172                            self.sstable_info.meta_offset,
173                            self.sstable_info.table_ids
174                        )
175                    );
176                }
177            }
178        }
179
180        self.next_block_index = self.sstable.meta.block_metas.len();
181        self.iter.take();
182        Ok(None)
183    }
184
185    pub(crate) fn init_block_iter(
186        &mut self,
187        buf: Bytes,
188        uncompressed_capacity: usize,
189    ) -> HummockResult<()> {
190        let block = Block::decode(buf, uncompressed_capacity)?;
191        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
192        iter.seek_to_first();
193        self.iter = Some(iter);
194        Ok(())
195    }
196
197    fn next_block_smallest(&self) -> &[u8] {
198        self.sstable.meta.block_metas[self.next_block_index]
199            .smallest_key
200            .as_ref()
201    }
202
203    fn next_block_largest(&self) -> &[u8] {
204        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
205            self.sstable.meta.block_metas[self.next_block_index + 1]
206                .smallest_key
207                .as_ref()
208        } else {
209            self.sstable.meta.largest_key.as_ref()
210        }
211    }
212
213    fn current_block_largest(&self) -> Vec<u8> {
214        if self.next_block_index < self.sstable.meta.block_metas.len() {
215            let mut largest_key = FullKey::decode(
216                self.sstable.meta.block_metas[self.next_block_index]
217                    .smallest_key
218                    .as_ref(),
219            );
220            // do not include this key because it is the smallest key of next block.
221            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
222            largest_key.encode()
223        } else {
224            self.sstable.meta.largest_key.clone()
225        }
226    }
227
228    fn key(&self) -> FullKey<&[u8]> {
229        match self.iter.as_ref() {
230            Some(iter) => iter.key(),
231            None => FullKey::decode(
232                self.sstable.meta.block_metas[self.next_block_index]
233                    .smallest_key
234                    .as_ref(),
235            ),
236        }
237    }
238
239    pub(crate) fn is_valid(&self) -> bool {
240        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
241    }
242
243    #[cfg(test)]
244    #[cfg(feature = "failpoints")]
245    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
246        self.iter.as_mut().unwrap()
247    }
248}
249
250impl Drop for BlockStreamIterator {
251    fn drop(&mut self) {
252        self.task_progress.dec_num_pending_read_io();
253    }
254}
255
256/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
257/// be consecutive and non-overlapping.
258pub struct ConcatSstableIterator {
259    /// The iterator of the current table.
260    sstable_iter: Option<BlockStreamIterator>,
261
262    /// Current table index.
263    cur_idx: usize,
264
265    /// All non-overlapping tables.
266    sstables: Vec<SstableInfo>,
267
268    sstable_store: SstableStoreRef,
269
270    stats: StoreLocalStatistic,
271    task_progress: Arc<TaskProgress>,
272
273    max_io_retry_times: usize,
274}
275
276impl ConcatSstableIterator {
277    /// Caller should make sure that `tables` are non-overlapping,
278    /// arranged in ascending order when it serves as a forward iterator,
279    /// and arranged in descending order when it serves as a backward iterator.
280    pub fn new(
281        sst_infos: Vec<SstableInfo>,
282        sstable_store: SstableStoreRef,
283        task_progress: Arc<TaskProgress>,
284        max_io_retry_times: usize,
285    ) -> Self {
286        Self {
287            sstable_iter: None,
288            cur_idx: 0,
289            sstables: sst_infos,
290            sstable_store,
291            task_progress,
292            stats: StoreLocalStatistic::default(),
293            max_io_retry_times,
294        }
295    }
296
297    pub async fn rewind(&mut self) -> HummockResult<()> {
298        self.seek_idx(0).await
299    }
300
301    pub async fn next_sstable(&mut self) -> HummockResult<()> {
302        self.seek_idx(self.cur_idx + 1).await
303    }
304
305    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
306        self.sstable_iter.as_mut().unwrap()
307    }
308
309    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
310        if let Some(sstable) = self.sstable_iter.as_mut() {
311            if sstable.iter.is_some() {
312                return Ok(());
313            }
314            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
315            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
316        }
317        Ok(())
318    }
319
320    pub fn is_valid(&self) -> bool {
321        self.cur_idx < self.sstables.len()
322    }
323
324    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
325    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
326        self.sstable_iter.take();
327        self.cur_idx = idx;
328        if self.cur_idx < self.sstables.len() {
329            let sstable_info = &self.sstables[self.cur_idx];
330            let sstable = self
331                .sstable_store
332                .sstable(sstable_info, &mut self.stats)
333                .instrument_await("stream_iter_sstable".verbose())
334                .await?;
335            self.task_progress.inc_num_pending_read_io();
336
337            let sstable_iter = BlockStreamIterator::new(
338                sstable,
339                self.task_progress.clone(),
340                self.sstable_store.clone(),
341                sstable_info.clone(),
342                self.max_io_retry_times,
343                self.stats.remote_io_time.clone(),
344            );
345            self.sstable_iter = Some(sstable_iter);
346        }
347        Ok(())
348    }
349}
350
351pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
352    left: Box<ConcatSstableIterator>,
353    right: Box<ConcatSstableIterator>,
354    task_id: u64,
355    executor: CompactTaskExecutor<
356        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
357        C,
358    >,
359    compression_algorithm: CompressionAlgorithm,
360    metrics: Arc<CompactorMetrics>,
361}
362
363impl<C: CompactionFilter> CompactorRunner<C> {
364    pub fn new(
365        context: CompactorContext,
366        task: CompactTask,
367        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
368        object_id_getter: Arc<dyn GetObjectId>,
369        task_progress: Arc<TaskProgress>,
370        compaction_filter: C,
371    ) -> Self {
372        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
373        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
374        options.compression_algorithm = compression_algorithm;
375        options.capacity = task.target_file_size as usize;
376        let get_id_time = Arc::new(AtomicU64::new(0));
377
378        let key_range = KeyRange::inf();
379
380        let task_config = TaskConfig {
381            key_range,
382            cache_policy: CachePolicy::NotFill,
383            gc_delete_keys: task.gc_delete_keys,
384            retain_multiple_version: false,
385            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
386            task_type: task.task_type,
387            table_vnode_partition: task.table_vnode_partition.clone(),
388            use_block_based_filter: true,
389            table_schemas: Default::default(),
390            disable_drop_column_optimization: false,
391        };
392        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
393
394        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
395            object_id_getter,
396            limiter: context.memory_limiter.clone(),
397            options,
398            policy: task_config.cache_policy,
399            remote_rpc_cost: get_id_time,
400            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
401            sstable_writer_factory: factory,
402            _phantom: PhantomData,
403        };
404        let sst_builder = CapacitySplitTableBuilder::new(
405            builder_factory,
406            context.compactor_metrics.clone(),
407            Some(task_progress.clone()),
408            task_config.table_vnode_partition.clone(),
409            context
410                .storage_opts
411                .compactor_concurrent_uploading_sst_count,
412            compaction_catalog_agent_ref.clone(),
413        );
414        assert_eq!(
415            task.input_ssts.len(),
416            2,
417            "TaskId {} target_level {:?} task {:?}",
418            task.task_id,
419            task.target_level,
420            compact_task_to_string(&task)
421        );
422        let left = Box::new(ConcatSstableIterator::new(
423            task.input_ssts[0].table_infos.clone(),
424            context.sstable_store.clone(),
425            task_progress.clone(),
426            context.storage_opts.compactor_iter_max_io_retry_times,
427        ));
428        let right = Box::new(ConcatSstableIterator::new(
429            task.input_ssts[1].table_infos.clone(),
430            context.sstable_store,
431            task_progress.clone(),
432            context.storage_opts.compactor_iter_max_io_retry_times,
433        ));
434
435        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
436        let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
437            task.pk_prefix_table_watermarks.clone(),
438        );
439        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
440            task.non_pk_prefix_table_watermarks.clone(),
441            compaction_catalog_agent_ref,
442        );
443
444        Self {
445            executor: CompactTaskExecutor::new(
446                sst_builder,
447                task_config,
448                task_progress,
449                state,
450                non_pk_prefix_state,
451                compaction_filter,
452            ),
453            left,
454            right,
455            task_id: task.task_id,
456            metrics: context.compactor_metrics,
457            compression_algorithm,
458        }
459    }
460
461    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
462        self.left.rewind().await?;
463        self.right.rewind().await?;
464        let mut skip_raw_block_count = 0;
465        let mut skip_raw_block_size = 0;
466        while self.left.is_valid() && self.right.is_valid() {
467            let ret = self
468                .left
469                .current_sstable()
470                .key()
471                .cmp(&self.right.current_sstable().key());
472            let (first, second) = if ret == Ordering::Less {
473                (&mut self.left, &mut self.right)
474            } else {
475                (&mut self.right, &mut self.left)
476            };
477            assert!(
478                ret != Ordering::Equal,
479                "sst range overlap equal_key {:?}",
480                self.left.current_sstable().key()
481            );
482            if first.current_sstable().iter.is_none() {
483                let right_key = second.current_sstable().key();
484                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
485                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
486                    // the full key may be either Excluded key or Included key, so we do not allow
487                    // they equals.
488                    if full_key.user_key.ge(&right_key.user_key) {
489                        break;
490                    }
491                    let smallest_key =
492                        FullKey::decode(first.current_sstable().next_block_smallest());
493                    if !self.executor.shall_copy_raw_block(&smallest_key) {
494                        break;
495                    }
496                    let smallest_key = smallest_key.to_vec();
497
498                    let (mut block, filter_data, mut meta) = first
499                        .current_sstable()
500                        .download_next_block()
501                        .await?
502                        .unwrap();
503                    let algorithm = Block::get_algorithm(&block)?;
504                    if algorithm == CompressionAlgorithm::None
505                        && algorithm != self.compression_algorithm
506                    {
507                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
508                        meta.len = block.len() as u32;
509                    }
510
511                    let largest_key = first.current_sstable().current_block_largest();
512                    let block_len = block.len() as u64;
513                    let block_key_count = meta.total_key_count;
514
515                    if self
516                        .executor
517                        .builder
518                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
519                        .await?
520                    {
521                        skip_raw_block_size += block_len;
522                        skip_raw_block_count += 1;
523                    }
524                    self.executor.may_report_process_key(block_key_count);
525                    self.executor.clear();
526                }
527                if !first.current_sstable().is_valid() {
528                    first.next_sstable().await?;
529                    continue;
530                }
531                first.init_block_iter().await?;
532            }
533
534            let target_key = second.current_sstable().key();
535            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
536            self.executor.run(iter, target_key).await?;
537            if !iter.is_valid() {
538                first.sstable_iter.as_mut().unwrap().iter.take();
539                if !first.current_sstable().is_valid() {
540                    first.next_sstable().await?;
541                }
542            }
543        }
544        let rest_data = if !self.left.is_valid() {
545            &mut self.right
546        } else {
547            &mut self.left
548        };
549        if rest_data.is_valid() {
550            // compact rest keys of the current block.
551            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
552            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
553            if let Some(iter) = sstable_iter.iter.as_mut() {
554                self.executor.run(iter, target_key).await?;
555                assert!(
556                    !iter.is_valid(),
557                    "iter should not be valid key {:?}",
558                    iter.key()
559                );
560            }
561            sstable_iter.iter.take();
562        }
563
564        while rest_data.is_valid() {
565            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
566            while sstable_iter.is_valid() {
567                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
568                let (block, filter_data, block_meta) =
569                    sstable_iter.download_next_block().await?.unwrap();
570                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
571                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
572                    && self.executor.last_key_is_delete;
573                if self.executor.builder.need_flush()
574                    || need_deleted
575                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
576                {
577                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
578                    let target_key = FullKey::decode(&largest_key);
579                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
580                    let mut iter = sstable_iter.iter.take().unwrap();
581                    self.executor.run(&mut iter, target_key).await?;
582                } else {
583                    let largest_key = sstable_iter.current_block_largest();
584                    let block_len = block.len() as u64;
585                    let block_key_count = block_meta.total_key_count;
586                    if self
587                        .executor
588                        .builder
589                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
590                        .await?
591                    {
592                        skip_raw_block_count += 1;
593                        skip_raw_block_size += block_len;
594                    }
595                    self.executor.may_report_process_key(block_key_count);
596                    self.executor.clear();
597                }
598            }
599            rest_data.next_sstable().await?;
600        }
601        let mut total_read_bytes = 0;
602        for sst in &self.left.sstables {
603            total_read_bytes += sst.sst_size;
604        }
605        for sst in &self.right.sstables {
606            total_read_bytes += sst.sst_size;
607        }
608        self.metrics
609            .compact_fast_runner_bytes
610            .inc_by(skip_raw_block_size);
611        tracing::info!(
612            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
613            skip_raw_block_count,
614            self.task_id,
615            skip_raw_block_size * 100 / total_read_bytes,
616        );
617
618        let statistic = self.executor.take_statistics();
619        let output_ssts = self.executor.builder.finish().await?;
620        Compactor::report_progress(
621            self.metrics.clone(),
622            Some(self.executor.task_progress.clone()),
623            &output_ssts,
624            false,
625        );
626        let sst_infos = output_ssts
627            .iter()
628            .map(|sst| sst.sst_info.clone())
629            .collect_vec();
630        assert!(can_concat(&sst_infos));
631        Ok((output_ssts, statistic))
632    }
633}
634
635pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
636    last_key: FullKey<Vec<u8>>,
637    compaction_statistics: CompactionStatistics,
638    last_table_id: Option<TableId>,
639    last_table_stats: TableStats,
640    builder: CapacitySplitTableBuilder<F>,
641    task_config: TaskConfig,
642    task_progress: Arc<TaskProgress>,
643    skip_watermark_state: PkPrefixSkipWatermarkState,
644    last_key_is_delete: bool,
645    progress_key_num: u32,
646    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
647    compaction_filter: C,
648}
649
650impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
651    pub fn new(
652        builder: CapacitySplitTableBuilder<F>,
653        task_config: TaskConfig,
654        task_progress: Arc<TaskProgress>,
655        skip_watermark_state: PkPrefixSkipWatermarkState,
656        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
657        compaction_filter: C,
658    ) -> Self {
659        Self {
660            builder,
661            task_config,
662            last_key: FullKey::default(),
663            last_key_is_delete: false,
664            compaction_statistics: CompactionStatistics::default(),
665            last_table_id: None,
666            last_table_stats: TableStats::default(),
667            task_progress,
668            skip_watermark_state,
669            progress_key_num: 0,
670            non_pk_prefix_skip_watermark_state,
671            compaction_filter,
672        }
673    }
674
675    fn take_statistics(&mut self) -> CompactionStatistics {
676        if let Some(last_table_id) = self.last_table_id.take() {
677            self.compaction_statistics
678                .delta_drop_stat
679                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
680        }
681        std::mem::take(&mut self.compaction_statistics)
682    }
683
684    fn clear(&mut self) {
685        if !self.last_key.is_empty() {
686            self.last_key = FullKey::default();
687        }
688        self.last_key_is_delete = false;
689    }
690
691    #[inline(always)]
692    fn may_report_process_key(&mut self, key_count: u32) {
693        const PROGRESS_KEY_INTERVAL: u32 = 100;
694        self.progress_key_num += key_count;
695        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
696            self.task_progress
697                .inc_progress_key(self.progress_key_num as u64);
698            self.progress_key_num = 0;
699        }
700    }
701
702    pub async fn run(
703        &mut self,
704        iter: &mut BlockIterator,
705        target_key: FullKey<&[u8]>,
706    ) -> HummockResult<()> {
707        self.skip_watermark_state.reset_watermark();
708        self.non_pk_prefix_skip_watermark_state.reset_watermark();
709
710        while iter.is_valid() && iter.key().le(&target_key) {
711            let is_new_user_key =
712                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
713            self.compaction_statistics.iter_total_key_counts += 1;
714            self.may_report_process_key(1);
715
716            let mut drop = false;
717            let value = HummockValue::from_slice(iter.value()).unwrap();
718            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
719            if is_first_or_new_user_key {
720                self.last_key.set(iter.key());
721                self.last_key_is_delete = false;
722            }
723
724            // See note in `compactor_runner.rs`.
725            if !self.task_config.retain_multiple_version
726                && self.task_config.gc_delete_keys
727                && value.is_delete()
728            {
729                drop = true;
730                self.last_key_is_delete = true;
731            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
732                drop = true;
733            }
734
735            if !drop && self.compaction_filter.should_delete(iter.key()) {
736                drop = true;
737            }
738
739            if !drop && self.watermark_should_delete(&iter.key()) {
740                drop = true;
741                self.last_key_is_delete = true;
742            }
743
744            if self.last_table_id != Some(self.last_key.user_key.table_id) {
745                if let Some(last_table_id) = self.last_table_id.take() {
746                    self.compaction_statistics
747                        .delta_drop_stat
748                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
749                }
750                self.last_table_id = Some(self.last_key.user_key.table_id);
751            }
752
753            if drop {
754                self.compaction_statistics.iter_drop_key_counts += 1;
755
756                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
757                    Some(target_table_ids) => {
758                        target_table_ids.contains(&self.last_key.user_key.table_id)
759                    }
760                    None => true,
761                };
762                if should_count {
763                    self.last_table_stats.total_key_count -= 1;
764                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
765                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
766                }
767                iter.next();
768                continue;
769            }
770            self.builder
771                .add_full_key(iter.key(), value, is_new_user_key)
772                .await?;
773            iter.next();
774        }
775        Ok(())
776    }
777
778    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
779        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
780            // If the last key is delete tombstone, we can not append the origin block
781            // because it would cause a deleted key could be see by user again.
782            return false;
783        }
784
785        if self.watermark_should_delete(smallest_key) {
786            return false;
787        }
788
789        // Check compaction filter
790        if self.compaction_filter.should_delete(*smallest_key) {
791            return false;
792        }
793
794        true
795    }
796
797    fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
798        (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
799            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
800                && self.non_pk_prefix_skip_watermark_state.should_delete(key))
801    }
802}