risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStats;
31use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
32
33use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
34use crate::hummock::block_stream::BlockDataStream;
35use crate::hummock::compactor::task_progress::TaskProgress;
36use crate::hummock::compactor::{
37    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
38    RemoteBuilderFactory, TaskConfig,
39};
40use crate::hummock::iterator::{
41    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
42};
43use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
44use crate::hummock::sstable_store::SstableStoreRef;
45use crate::hummock::value::HummockValue;
46use crate::hummock::{
47    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
48    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
49    TableHolder, UnifiedSstableWriterFactory,
50};
51use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
52
53/// Iterates over the KV-pairs of an SST while downloading it.
54pub struct BlockStreamIterator {
55    /// The downloading stream.
56    block_stream: Option<BlockDataStream>,
57
58    next_block_index: usize,
59
60    /// For key sanity check of divided SST and debugging
61    sstable: TableHolder,
62    iter: Option<BlockIterator>,
63    task_progress: Arc<TaskProgress>,
64
65    // For block stream recreate
66    sstable_store: SstableStoreRef,
67    sstable_info: SstableInfo,
68    io_retry_times: usize,
69    max_io_retry_times: usize,
70    stats_ptr: Arc<AtomicU64>,
71}
72
73impl BlockStreamIterator {
74    // We have to handle two internal iterators.
75    //   `block_stream`: iterates over the blocks of the table.
76    //     `block_iter`: iterates over the KV-pairs of the current block.
77    // These iterators work in different ways.
78
79    // BlockIterator works as follows: After new(), we call seek(). That brings us
80    // to the first element. Calling next() then brings us to the second element and does not
81    // return anything.
82
83    // BlockStream follows a different approach. After new(), we do not seek, instead next()
84    // returns the first value.
85
86    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
87    /// The iterator reads at most `max_block_count` from the stream.
88    pub fn new(
89        sstable: TableHolder,
90        task_progress: Arc<TaskProgress>,
91        sstable_store: SstableStoreRef,
92        sstable_info: SstableInfo,
93        max_io_retry_times: usize,
94        stats_ptr: Arc<AtomicU64>,
95    ) -> Self {
96        Self {
97            block_stream: None,
98            next_block_index: 0,
99            sstable,
100            iter: None,
101            task_progress,
102            sstable_store,
103            sstable_info,
104            io_retry_times: 0,
105            max_io_retry_times,
106            stats_ptr,
107        }
108    }
109
110    async fn create_stream(&mut self) -> HummockResult<()> {
111        // Fast compact only support the single table compaction.(not split sst)
112        // So we don't need to filter the block_metas with table_id and key_range
113        let block_stream = self
114            .sstable_store
115            .get_stream_for_blocks(
116                self.sstable_info.object_id,
117                &self.sstable.meta.block_metas[self.next_block_index..],
118            )
119            .instrument_await("stream_iter_get_stream".verbose())
120            .await?;
121        self.block_stream = Some(block_stream);
122        Ok(())
123    }
124
125    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
126    pub(crate) async fn download_next_block(
127        &mut self,
128    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
129        let now = Instant::now();
130        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
131            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
132            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
133        });
134        loop {
135            let ret = match &mut self.block_stream {
136                Some(block_stream) => block_stream.next_block_impl().await,
137                None => {
138                    self.create_stream().await?;
139                    continue;
140                }
141            };
142            match ret {
143                Ok(Some((data, _))) => {
144                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
145                    let filter_block = self
146                        .sstable
147                        .filter_reader
148                        .get_block_raw_filter(self.next_block_index);
149                    self.next_block_index += 1;
150                    return Ok(Some((data, filter_block, meta)));
151                }
152
153                Ok(None) => break,
154
155                Err(e) => {
156                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
157                        return Err(e);
158                    }
159
160                    self.block_stream.take();
161                    self.io_retry_times += 1;
162                    fail_point!("create_stream_err");
163
164                    tracing::warn!(
165                        "fast compact retry create stream for sstable {} times, sstinfo={}",
166                        self.io_retry_times,
167                        format!(
168                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
169                            self.sstable_info.object_id,
170                            self.sstable_info.sst_id,
171                            self.sstable_info.meta_offset,
172                            self.sstable_info.table_ids
173                        )
174                    );
175                }
176            }
177        }
178
179        self.next_block_index = self.sstable.meta.block_metas.len();
180        self.iter.take();
181        Ok(None)
182    }
183
184    pub(crate) fn init_block_iter(
185        &mut self,
186        buf: Bytes,
187        uncompressed_capacity: usize,
188    ) -> HummockResult<()> {
189        let block = Block::decode(buf, uncompressed_capacity)?;
190        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
191        iter.seek_to_first();
192        self.iter = Some(iter);
193        Ok(())
194    }
195
196    fn next_block_smallest(&self) -> &[u8] {
197        self.sstable.meta.block_metas[self.next_block_index]
198            .smallest_key
199            .as_ref()
200    }
201
202    fn next_block_largest(&self) -> &[u8] {
203        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
204            self.sstable.meta.block_metas[self.next_block_index + 1]
205                .smallest_key
206                .as_ref()
207        } else {
208            self.sstable.meta.largest_key.as_ref()
209        }
210    }
211
212    fn current_block_largest(&self) -> Vec<u8> {
213        if self.next_block_index < self.sstable.meta.block_metas.len() {
214            let mut largest_key = FullKey::decode(
215                self.sstable.meta.block_metas[self.next_block_index]
216                    .smallest_key
217                    .as_ref(),
218            );
219            // do not include this key because it is the smallest key of next block.
220            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
221            largest_key.encode()
222        } else {
223            self.sstable.meta.largest_key.clone()
224        }
225    }
226
227    fn key(&self) -> FullKey<&[u8]> {
228        match self.iter.as_ref() {
229            Some(iter) => iter.key(),
230            None => FullKey::decode(
231                self.sstable.meta.block_metas[self.next_block_index]
232                    .smallest_key
233                    .as_ref(),
234            ),
235        }
236    }
237
238    pub(crate) fn is_valid(&self) -> bool {
239        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
240    }
241
242    #[cfg(test)]
243    #[cfg(feature = "failpoints")]
244    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
245        self.iter.as_mut().unwrap()
246    }
247}
248
249impl Drop for BlockStreamIterator {
250    fn drop(&mut self) {
251        self.task_progress.dec_num_pending_read_io();
252    }
253}
254
255/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
256/// be consecutive and non-overlapping.
257pub struct ConcatSstableIterator {
258    /// The iterator of the current table.
259    sstable_iter: Option<BlockStreamIterator>,
260
261    /// Current table index.
262    cur_idx: usize,
263
264    /// All non-overlapping tables.
265    sstables: Vec<SstableInfo>,
266
267    sstable_store: SstableStoreRef,
268
269    stats: StoreLocalStatistic,
270    task_progress: Arc<TaskProgress>,
271
272    max_io_retry_times: usize,
273}
274
275impl ConcatSstableIterator {
276    /// Caller should make sure that `tables` are non-overlapping,
277    /// arranged in ascending order when it serves as a forward iterator,
278    /// and arranged in descending order when it serves as a backward iterator.
279    pub fn new(
280        sst_infos: Vec<SstableInfo>,
281        sstable_store: SstableStoreRef,
282        task_progress: Arc<TaskProgress>,
283        max_io_retry_times: usize,
284    ) -> Self {
285        Self {
286            sstable_iter: None,
287            cur_idx: 0,
288            sstables: sst_infos,
289            sstable_store,
290            task_progress,
291            stats: StoreLocalStatistic::default(),
292            max_io_retry_times,
293        }
294    }
295
296    pub async fn rewind(&mut self) -> HummockResult<()> {
297        self.seek_idx(0).await
298    }
299
300    pub async fn next_sstable(&mut self) -> HummockResult<()> {
301        self.seek_idx(self.cur_idx + 1).await
302    }
303
304    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
305        self.sstable_iter.as_mut().unwrap()
306    }
307
308    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
309        if let Some(sstable) = self.sstable_iter.as_mut() {
310            if sstable.iter.is_some() {
311                return Ok(());
312            }
313            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
314            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
315        }
316        Ok(())
317    }
318
319    pub fn is_valid(&self) -> bool {
320        self.cur_idx < self.sstables.len()
321    }
322
323    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
324    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
325        self.sstable_iter.take();
326        self.cur_idx = idx;
327        if self.cur_idx < self.sstables.len() {
328            let sstable_info = &self.sstables[self.cur_idx];
329            let sstable = self
330                .sstable_store
331                .sstable(sstable_info, &mut self.stats)
332                .instrument_await("stream_iter_sstable".verbose())
333                .await?;
334            self.task_progress.inc_num_pending_read_io();
335
336            let sstable_iter = BlockStreamIterator::new(
337                sstable,
338                self.task_progress.clone(),
339                self.sstable_store.clone(),
340                sstable_info.clone(),
341                self.max_io_retry_times,
342                self.stats.remote_io_time.clone(),
343            );
344            self.sstable_iter = Some(sstable_iter);
345        }
346        Ok(())
347    }
348}
349
350pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
351    left: Box<ConcatSstableIterator>,
352    right: Box<ConcatSstableIterator>,
353    task_id: u64,
354    executor: CompactTaskExecutor<
355        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
356        C,
357    >,
358    compression_algorithm: CompressionAlgorithm,
359    metrics: Arc<CompactorMetrics>,
360}
361
362impl<C: CompactionFilter> CompactorRunner<C> {
363    pub fn new(
364        context: CompactorContext,
365        task: CompactTask,
366        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
367        object_id_getter: Arc<dyn GetObjectId>,
368        task_progress: Arc<TaskProgress>,
369        compaction_filter: C,
370    ) -> Self {
371        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
372        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
373        options.compression_algorithm = compression_algorithm;
374        options.capacity = task.target_file_size as usize;
375        let get_id_time = Arc::new(AtomicU64::new(0));
376
377        let key_range = KeyRange::inf();
378
379        let task_config = TaskConfig {
380            key_range,
381            cache_policy: CachePolicy::NotFill,
382            gc_delete_keys: task.gc_delete_keys,
383            retain_multiple_version: false,
384            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
385            task_type: task.task_type,
386            table_vnode_partition: task.table_vnode_partition.clone(),
387            use_block_based_filter: true,
388            table_schemas: Default::default(),
389            disable_drop_column_optimization: false,
390        };
391        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
392
393        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
394            object_id_getter,
395            limiter: context.memory_limiter.clone(),
396            options,
397            policy: task_config.cache_policy,
398            remote_rpc_cost: get_id_time,
399            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
400            sstable_writer_factory: factory,
401            _phantom: PhantomData,
402        };
403        let sst_builder = CapacitySplitTableBuilder::new(
404            builder_factory,
405            context.compactor_metrics.clone(),
406            Some(task_progress.clone()),
407            task_config.table_vnode_partition.clone(),
408            context
409                .storage_opts
410                .compactor_concurrent_uploading_sst_count,
411            compaction_catalog_agent_ref.clone(),
412        );
413        assert_eq!(
414            task.input_ssts.len(),
415            2,
416            "TaskId {} target_level {:?} task {:?}",
417            task.task_id,
418            task.target_level,
419            compact_task_to_string(&task)
420        );
421        let left = Box::new(ConcatSstableIterator::new(
422            task.input_ssts[0].table_infos.clone(),
423            context.sstable_store.clone(),
424            task_progress.clone(),
425            context.storage_opts.compactor_iter_max_io_retry_times,
426        ));
427        let right = Box::new(ConcatSstableIterator::new(
428            task.input_ssts[1].table_infos.clone(),
429            context.sstable_store,
430            task_progress.clone(),
431            context.storage_opts.compactor_iter_max_io_retry_times,
432        ));
433
434        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
435        let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436            task.pk_prefix_table_watermarks.clone(),
437        );
438        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439            task.non_pk_prefix_table_watermarks.clone(),
440            compaction_catalog_agent_ref,
441        );
442
443        Self {
444            executor: CompactTaskExecutor::new(
445                sst_builder,
446                task_config,
447                task_progress,
448                state,
449                non_pk_prefix_state,
450                compaction_filter,
451            ),
452            left,
453            right,
454            task_id: task.task_id,
455            metrics: context.compactor_metrics.clone(),
456            compression_algorithm,
457        }
458    }
459
460    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
461        self.left.rewind().await?;
462        self.right.rewind().await?;
463        let mut skip_raw_block_count = 0;
464        let mut skip_raw_block_size = 0;
465        while self.left.is_valid() && self.right.is_valid() {
466            let ret = self
467                .left
468                .current_sstable()
469                .key()
470                .cmp(&self.right.current_sstable().key());
471            let (first, second) = if ret == Ordering::Less {
472                (&mut self.left, &mut self.right)
473            } else {
474                (&mut self.right, &mut self.left)
475            };
476            assert!(
477                ret != Ordering::Equal,
478                "sst range overlap equal_key {:?}",
479                self.left.current_sstable().key()
480            );
481            if first.current_sstable().iter.is_none() {
482                let right_key = second.current_sstable().key();
483                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
484                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
485                    // the full key may be either Excluded key or Included key, so we do not allow
486                    // they equals.
487                    if full_key.user_key.ge(&right_key.user_key) {
488                        break;
489                    }
490                    let smallest_key =
491                        FullKey::decode(first.current_sstable().next_block_smallest());
492                    if !self.executor.shall_copy_raw_block(&smallest_key) {
493                        break;
494                    }
495                    let smallest_key = smallest_key.to_vec();
496
497                    let (mut block, filter_data, mut meta) = first
498                        .current_sstable()
499                        .download_next_block()
500                        .await?
501                        .unwrap();
502                    let algorithm = Block::get_algorithm(&block)?;
503                    if algorithm == CompressionAlgorithm::None
504                        && algorithm != self.compression_algorithm
505                    {
506                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
507                        meta.len = block.len() as u32;
508                    }
509
510                    let largest_key = first.current_sstable().current_block_largest();
511                    let block_len = block.len() as u64;
512                    let block_key_count = meta.total_key_count;
513
514                    if self
515                        .executor
516                        .builder
517                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
518                        .await?
519                    {
520                        skip_raw_block_size += block_len;
521                        skip_raw_block_count += 1;
522                    }
523                    self.executor.may_report_process_key(block_key_count);
524                    self.executor.clear();
525                }
526                if !first.current_sstable().is_valid() {
527                    first.next_sstable().await?;
528                    continue;
529                }
530                first.init_block_iter().await?;
531            }
532
533            let target_key = second.current_sstable().key();
534            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
535            self.executor.run(iter, target_key).await?;
536            if !iter.is_valid() {
537                first.sstable_iter.as_mut().unwrap().iter.take();
538                if !first.current_sstable().is_valid() {
539                    first.next_sstable().await?;
540                }
541            }
542        }
543        let rest_data = if !self.left.is_valid() {
544            &mut self.right
545        } else {
546            &mut self.left
547        };
548        if rest_data.is_valid() {
549            // compact rest keys of the current block.
550            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
551            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
552            if let Some(iter) = sstable_iter.iter.as_mut() {
553                self.executor.run(iter, target_key).await?;
554                assert!(
555                    !iter.is_valid(),
556                    "iter should not be valid key {:?}",
557                    iter.key()
558                );
559            }
560            sstable_iter.iter.take();
561        }
562
563        while rest_data.is_valid() {
564            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
565            while sstable_iter.is_valid() {
566                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
567                let (block, filter_data, block_meta) =
568                    sstable_iter.download_next_block().await?.unwrap();
569                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
570                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
571                    && self.executor.last_key_is_delete;
572                if self.executor.builder.need_flush()
573                    || need_deleted
574                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
575                {
576                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
577                    let target_key = FullKey::decode(&largest_key);
578                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
579                    let mut iter = sstable_iter.iter.take().unwrap();
580                    self.executor.run(&mut iter, target_key).await?;
581                } else {
582                    let largest_key = sstable_iter.current_block_largest();
583                    let block_len = block.len() as u64;
584                    let block_key_count = block_meta.total_key_count;
585                    if self
586                        .executor
587                        .builder
588                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
589                        .await?
590                    {
591                        skip_raw_block_count += 1;
592                        skip_raw_block_size += block_len;
593                    }
594                    self.executor.may_report_process_key(block_key_count);
595                    self.executor.clear();
596                }
597            }
598            rest_data.next_sstable().await?;
599        }
600        let mut total_read_bytes = 0;
601        for sst in &self.left.sstables {
602            total_read_bytes += sst.sst_size;
603        }
604        for sst in &self.right.sstables {
605            total_read_bytes += sst.sst_size;
606        }
607        self.metrics
608            .compact_fast_runner_bytes
609            .inc_by(skip_raw_block_size);
610        tracing::info!(
611            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
612            skip_raw_block_count,
613            self.task_id,
614            skip_raw_block_size * 100 / total_read_bytes,
615        );
616
617        let statistic = self.executor.take_statistics();
618        let output_ssts = self.executor.builder.finish().await?;
619        Compactor::report_progress(
620            self.metrics.clone(),
621            Some(self.executor.task_progress.clone()),
622            &output_ssts,
623            false,
624        );
625        let sst_infos = output_ssts
626            .iter()
627            .map(|sst| sst.sst_info.clone())
628            .collect_vec();
629        assert!(can_concat(&sst_infos));
630        Ok((output_ssts, statistic))
631    }
632}
633
634pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
635    last_key: FullKey<Vec<u8>>,
636    compaction_statistics: CompactionStatistics,
637    last_table_id: Option<u32>,
638    last_table_stats: TableStats,
639    builder: CapacitySplitTableBuilder<F>,
640    task_config: TaskConfig,
641    task_progress: Arc<TaskProgress>,
642    skip_watermark_state: PkPrefixSkipWatermarkState,
643    last_key_is_delete: bool,
644    progress_key_num: u32,
645    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
646    compaction_filter: C,
647}
648
649impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
650    pub fn new(
651        builder: CapacitySplitTableBuilder<F>,
652        task_config: TaskConfig,
653        task_progress: Arc<TaskProgress>,
654        skip_watermark_state: PkPrefixSkipWatermarkState,
655        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
656        compaction_filter: C,
657    ) -> Self {
658        Self {
659            builder,
660            task_config,
661            last_key: FullKey::default(),
662            last_key_is_delete: false,
663            compaction_statistics: CompactionStatistics::default(),
664            last_table_id: None,
665            last_table_stats: TableStats::default(),
666            task_progress,
667            skip_watermark_state,
668            progress_key_num: 0,
669            non_pk_prefix_skip_watermark_state,
670            compaction_filter,
671        }
672    }
673
674    fn take_statistics(&mut self) -> CompactionStatistics {
675        if let Some(last_table_id) = self.last_table_id.take() {
676            self.compaction_statistics
677                .delta_drop_stat
678                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
679        }
680        std::mem::take(&mut self.compaction_statistics)
681    }
682
683    fn clear(&mut self) {
684        if !self.last_key.is_empty() {
685            self.last_key = FullKey::default();
686        }
687        self.last_key_is_delete = false;
688    }
689
690    #[inline(always)]
691    fn may_report_process_key(&mut self, key_count: u32) {
692        const PROGRESS_KEY_INTERVAL: u32 = 100;
693        self.progress_key_num += key_count;
694        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
695            self.task_progress
696                .inc_progress_key(self.progress_key_num as u64);
697            self.progress_key_num = 0;
698        }
699    }
700
701    pub async fn run(
702        &mut self,
703        iter: &mut BlockIterator,
704        target_key: FullKey<&[u8]>,
705    ) -> HummockResult<()> {
706        self.skip_watermark_state.reset_watermark();
707        self.non_pk_prefix_skip_watermark_state.reset_watermark();
708
709        while iter.is_valid() && iter.key().le(&target_key) {
710            let is_new_user_key =
711                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
712            self.compaction_statistics.iter_total_key_counts += 1;
713            self.may_report_process_key(1);
714
715            let mut drop = false;
716            let value = HummockValue::from_slice(iter.value()).unwrap();
717            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
718            if is_first_or_new_user_key {
719                self.last_key.set(iter.key());
720                self.last_key_is_delete = false;
721            }
722
723            // See note in `compactor_runner.rs`.
724            if !self.task_config.retain_multiple_version
725                && self.task_config.gc_delete_keys
726                && value.is_delete()
727            {
728                drop = true;
729                self.last_key_is_delete = true;
730            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
731                drop = true;
732            }
733
734            if !drop && self.compaction_filter.should_delete(iter.key()) {
735                drop = true;
736            }
737
738            if !drop && self.watermark_should_delete(&iter.key()) {
739                drop = true;
740                self.last_key_is_delete = true;
741            }
742
743            if self.last_table_id != Some(self.last_key.user_key.table_id.table_id) {
744                if let Some(last_table_id) = self.last_table_id.take() {
745                    self.compaction_statistics
746                        .delta_drop_stat
747                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
748                }
749                self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
750            }
751
752            if drop {
753                self.compaction_statistics.iter_drop_key_counts += 1;
754
755                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
756                    Some(target_table_ids) => {
757                        target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
758                    }
759                    None => true,
760                };
761                if should_count {
762                    self.last_table_stats.total_key_count -= 1;
763                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
764                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
765                }
766                iter.next();
767                continue;
768            }
769            self.builder
770                .add_full_key(iter.key(), value, is_new_user_key)
771                .await?;
772            iter.next();
773        }
774        Ok(())
775    }
776
777    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
778        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
779            // If the last key is delete tombstone, we can not append the origin block
780            // because it would cause a deleted key could be see by user again.
781            return false;
782        }
783
784        if self.watermark_should_delete(smallest_key) {
785            return false;
786        }
787
788        // Check compaction filter
789        if self.compaction_filter.should_delete(*smallest_key) {
790            return false;
791        }
792
793        true
794    }
795
796    fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
797        (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
798            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
799                && self.non_pk_prefix_skip_watermark_state.should_delete(key))
800    }
801}