risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStats;
31use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
32
33use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
34use crate::hummock::block_stream::BlockDataStream;
35use crate::hummock::compactor::task_progress::TaskProgress;
36use crate::hummock::compactor::{
37    CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig,
38};
39use crate::hummock::iterator::{
40    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
41};
42use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
43use crate::hummock::sstable_store::SstableStoreRef;
44use crate::hummock::value::HummockValue;
45use crate::hummock::{
46    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
47    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
48    TableHolder, UnifiedSstableWriterFactory,
49};
50use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
51
52/// Iterates over the KV-pairs of an SST while downloading it.
53pub struct BlockStreamIterator {
54    /// The downloading stream.
55    block_stream: Option<BlockDataStream>,
56
57    next_block_index: usize,
58
59    /// For key sanity check of divided SST and debugging
60    sstable: TableHolder,
61    iter: Option<BlockIterator>,
62    task_progress: Arc<TaskProgress>,
63
64    // For block stream recreate
65    sstable_store: SstableStoreRef,
66    sstable_info: SstableInfo,
67    io_retry_times: usize,
68    max_io_retry_times: usize,
69    stats_ptr: Arc<AtomicU64>,
70}
71
72impl BlockStreamIterator {
73    // We have to handle two internal iterators.
74    //   `block_stream`: iterates over the blocks of the table.
75    //     `block_iter`: iterates over the KV-pairs of the current block.
76    // These iterators work in different ways.
77
78    // BlockIterator works as follows: After new(), we call seek(). That brings us
79    // to the first element. Calling next() then brings us to the second element and does not
80    // return anything.
81
82    // BlockStream follows a different approach. After new(), we do not seek, instead next()
83    // returns the first value.
84
85    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
86    /// The iterator reads at most `max_block_count` from the stream.
87    pub fn new(
88        sstable: TableHolder,
89        task_progress: Arc<TaskProgress>,
90        sstable_store: SstableStoreRef,
91        sstable_info: SstableInfo,
92        max_io_retry_times: usize,
93        stats_ptr: Arc<AtomicU64>,
94    ) -> Self {
95        Self {
96            block_stream: None,
97            next_block_index: 0,
98            sstable,
99            iter: None,
100            task_progress,
101            sstable_store,
102            sstable_info,
103            io_retry_times: 0,
104            max_io_retry_times,
105            stats_ptr,
106        }
107    }
108
109    async fn create_stream(&mut self) -> HummockResult<()> {
110        // Fast compact only support the single table compaction.(not split sst)
111        // So we don't need to filter the block_metas with table_id and key_range
112        let block_stream = self
113            .sstable_store
114            .get_stream_for_blocks(
115                self.sstable_info.object_id,
116                &self.sstable.meta.block_metas[self.next_block_index..],
117            )
118            .instrument_await("stream_iter_get_stream".verbose())
119            .await?;
120        self.block_stream = Some(block_stream);
121        Ok(())
122    }
123
124    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
125    pub(crate) async fn download_next_block(
126        &mut self,
127    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
128        let now = Instant::now();
129        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
130            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
131            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
132        });
133        loop {
134            let ret = match &mut self.block_stream {
135                Some(block_stream) => block_stream.next_block_impl().await,
136                None => {
137                    self.create_stream().await?;
138                    continue;
139                }
140            };
141            match ret {
142                Ok(Some((data, _))) => {
143                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
144                    let filter_block = self
145                        .sstable
146                        .filter_reader
147                        .get_block_raw_filter(self.next_block_index);
148                    self.next_block_index += 1;
149                    return Ok(Some((data, filter_block, meta)));
150                }
151
152                Ok(None) => break,
153
154                Err(e) => {
155                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
156                        return Err(e);
157                    }
158
159                    self.block_stream.take();
160                    self.io_retry_times += 1;
161                    fail_point!("create_stream_err");
162
163                    tracing::warn!(
164                        "fast compact retry create stream for sstable {} times, sstinfo={}",
165                        self.io_retry_times,
166                        format!(
167                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
168                            self.sstable_info.object_id,
169                            self.sstable_info.sst_id,
170                            self.sstable_info.meta_offset,
171                            self.sstable_info.table_ids
172                        )
173                    );
174                }
175            }
176        }
177
178        self.next_block_index = self.sstable.meta.block_metas.len();
179        self.iter.take();
180        Ok(None)
181    }
182
183    pub(crate) fn init_block_iter(
184        &mut self,
185        buf: Bytes,
186        uncompressed_capacity: usize,
187    ) -> HummockResult<()> {
188        let block = Block::decode(buf, uncompressed_capacity)?;
189        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
190        iter.seek_to_first();
191        self.iter = Some(iter);
192        Ok(())
193    }
194
195    fn next_block_smallest(&self) -> &[u8] {
196        self.sstable.meta.block_metas[self.next_block_index]
197            .smallest_key
198            .as_ref()
199    }
200
201    fn next_block_largest(&self) -> &[u8] {
202        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
203            self.sstable.meta.block_metas[self.next_block_index + 1]
204                .smallest_key
205                .as_ref()
206        } else {
207            self.sstable.meta.largest_key.as_ref()
208        }
209    }
210
211    fn current_block_largest(&self) -> Vec<u8> {
212        if self.next_block_index < self.sstable.meta.block_metas.len() {
213            let mut largest_key = FullKey::decode(
214                self.sstable.meta.block_metas[self.next_block_index]
215                    .smallest_key
216                    .as_ref(),
217            );
218            // do not include this key because it is the smallest key of next block.
219            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
220            largest_key.encode()
221        } else {
222            self.sstable.meta.largest_key.clone()
223        }
224    }
225
226    fn key(&self) -> FullKey<&[u8]> {
227        match self.iter.as_ref() {
228            Some(iter) => iter.key(),
229            None => FullKey::decode(
230                self.sstable.meta.block_metas[self.next_block_index]
231                    .smallest_key
232                    .as_ref(),
233            ),
234        }
235    }
236
237    pub(crate) fn is_valid(&self) -> bool {
238        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
239    }
240
241    #[cfg(test)]
242    #[cfg(feature = "failpoints")]
243    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
244        self.iter.as_mut().unwrap()
245    }
246}
247
248impl Drop for BlockStreamIterator {
249    fn drop(&mut self) {
250        self.task_progress.dec_num_pending_read_io();
251    }
252}
253
254/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
255/// be consecutive and non-overlapping.
256pub struct ConcatSstableIterator {
257    /// The iterator of the current table.
258    sstable_iter: Option<BlockStreamIterator>,
259
260    /// Current table index.
261    cur_idx: usize,
262
263    /// All non-overlapping tables.
264    sstables: Vec<SstableInfo>,
265
266    sstable_store: SstableStoreRef,
267
268    stats: StoreLocalStatistic,
269    task_progress: Arc<TaskProgress>,
270
271    max_io_retry_times: usize,
272}
273
274impl ConcatSstableIterator {
275    /// Caller should make sure that `tables` are non-overlapping,
276    /// arranged in ascending order when it serves as a forward iterator,
277    /// and arranged in descending order when it serves as a backward iterator.
278    pub fn new(
279        sst_infos: Vec<SstableInfo>,
280        sstable_store: SstableStoreRef,
281        task_progress: Arc<TaskProgress>,
282        max_io_retry_times: usize,
283    ) -> Self {
284        Self {
285            sstable_iter: None,
286            cur_idx: 0,
287            sstables: sst_infos,
288            sstable_store,
289            task_progress,
290            stats: StoreLocalStatistic::default(),
291            max_io_retry_times,
292        }
293    }
294
295    pub async fn rewind(&mut self) -> HummockResult<()> {
296        self.seek_idx(0).await
297    }
298
299    pub async fn next_sstable(&mut self) -> HummockResult<()> {
300        self.seek_idx(self.cur_idx + 1).await
301    }
302
303    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
304        self.sstable_iter.as_mut().unwrap()
305    }
306
307    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
308        if let Some(sstable) = self.sstable_iter.as_mut() {
309            if sstable.iter.is_some() {
310                return Ok(());
311            }
312            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
313            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
314        }
315        Ok(())
316    }
317
318    pub fn is_valid(&self) -> bool {
319        self.cur_idx < self.sstables.len()
320    }
321
322    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
323    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
324        self.sstable_iter.take();
325        self.cur_idx = idx;
326        if self.cur_idx < self.sstables.len() {
327            let sstable_info = &self.sstables[self.cur_idx];
328            let sstable = self
329                .sstable_store
330                .sstable(sstable_info, &mut self.stats)
331                .instrument_await("stream_iter_sstable".verbose())
332                .await?;
333            self.task_progress.inc_num_pending_read_io();
334
335            let sstable_iter = BlockStreamIterator::new(
336                sstable,
337                self.task_progress.clone(),
338                self.sstable_store.clone(),
339                sstable_info.clone(),
340                self.max_io_retry_times,
341                self.stats.remote_io_time.clone(),
342            );
343            self.sstable_iter = Some(sstable_iter);
344        }
345        Ok(())
346    }
347}
348
349pub struct CompactorRunner {
350    left: Box<ConcatSstableIterator>,
351    right: Box<ConcatSstableIterator>,
352    task_id: u64,
353    executor: CompactTaskExecutor<
354        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
355    >,
356    compression_algorithm: CompressionAlgorithm,
357    metrics: Arc<CompactorMetrics>,
358}
359
360impl CompactorRunner {
361    pub fn new(
362        context: CompactorContext,
363        task: CompactTask,
364        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
365        object_id_getter: Box<dyn GetObjectId>,
366        task_progress: Arc<TaskProgress>,
367    ) -> Self {
368        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
369        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
370        options.compression_algorithm = compression_algorithm;
371        options.capacity = task.target_file_size as usize;
372        let get_id_time = Arc::new(AtomicU64::new(0));
373
374        let key_range = KeyRange::inf();
375
376        let task_config = TaskConfig {
377            key_range,
378            cache_policy: CachePolicy::NotFill,
379            gc_delete_keys: task.gc_delete_keys,
380            retain_multiple_version: false,
381            stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
382            task_type: task.task_type,
383            table_vnode_partition: task.table_vnode_partition.clone(),
384            use_block_based_filter: true,
385            table_schemas: Default::default(),
386            disable_drop_column_optimization: false,
387        };
388        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
389
390        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
391            object_id_getter,
392            limiter: context.memory_limiter.clone(),
393            options,
394            policy: task_config.cache_policy,
395            remote_rpc_cost: get_id_time,
396            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
397            sstable_writer_factory: factory,
398            _phantom: PhantomData,
399        };
400        let sst_builder = CapacitySplitTableBuilder::new(
401            builder_factory,
402            context.compactor_metrics.clone(),
403            Some(task_progress.clone()),
404            task_config.table_vnode_partition.clone(),
405            context
406                .storage_opts
407                .compactor_concurrent_uploading_sst_count,
408            compaction_catalog_agent_ref.clone(),
409        );
410        assert_eq!(
411            task.input_ssts.len(),
412            2,
413            "TaskId {} target_level {:?} task {:?}",
414            task.task_id,
415            task.target_level,
416            compact_task_to_string(&task)
417        );
418        let left = Box::new(ConcatSstableIterator::new(
419            task.input_ssts[0].table_infos.clone(),
420            context.sstable_store.clone(),
421            task_progress.clone(),
422            context.storage_opts.compactor_iter_max_io_retry_times,
423        ));
424        let right = Box::new(ConcatSstableIterator::new(
425            task.input_ssts[1].table_infos.clone(),
426            context.sstable_store,
427            task_progress.clone(),
428            context.storage_opts.compactor_iter_max_io_retry_times,
429        ));
430
431        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
432        let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
433            task.pk_prefix_table_watermarks.clone(),
434        );
435        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436            task.non_pk_prefix_table_watermarks.clone(),
437            compaction_catalog_agent_ref,
438        );
439
440        Self {
441            executor: CompactTaskExecutor::new(
442                sst_builder,
443                task_config,
444                task_progress,
445                state,
446                non_pk_prefix_state,
447            ),
448            left,
449            right,
450            task_id: task.task_id,
451            metrics: context.compactor_metrics.clone(),
452            compression_algorithm,
453        }
454    }
455
456    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
457        self.left.rewind().await?;
458        self.right.rewind().await?;
459        let mut skip_raw_block_count = 0;
460        let mut skip_raw_block_size = 0;
461        while self.left.is_valid() && self.right.is_valid() {
462            let ret = self
463                .left
464                .current_sstable()
465                .key()
466                .cmp(&self.right.current_sstable().key());
467            let (first, second) = if ret == Ordering::Less {
468                (&mut self.left, &mut self.right)
469            } else {
470                (&mut self.right, &mut self.left)
471            };
472            assert!(
473                ret != Ordering::Equal,
474                "sst range overlap equal_key {:?}",
475                self.left.current_sstable().key()
476            );
477            if first.current_sstable().iter.is_none() {
478                let right_key = second.current_sstable().key();
479                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
480                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
481                    // the full key may be either Excluded key or Included key, so we do not allow
482                    // they equals.
483                    if full_key.user_key.ge(&right_key.user_key) {
484                        break;
485                    }
486                    let smallest_key =
487                        FullKey::decode(first.current_sstable().next_block_smallest());
488                    if !self.executor.shall_copy_raw_block(&smallest_key) {
489                        break;
490                    }
491                    let smallest_key = smallest_key.to_vec();
492
493                    let (mut block, filter_data, mut meta) = first
494                        .current_sstable()
495                        .download_next_block()
496                        .await?
497                        .unwrap();
498                    let algorithm = Block::get_algorithm(&block)?;
499                    if algorithm == CompressionAlgorithm::None
500                        && algorithm != self.compression_algorithm
501                    {
502                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
503                        meta.len = block.len() as u32;
504                    }
505
506                    let largest_key = first.current_sstable().current_block_largest();
507                    let block_len = block.len() as u64;
508                    let block_key_count = meta.total_key_count;
509
510                    if self
511                        .executor
512                        .builder
513                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
514                        .await?
515                    {
516                        skip_raw_block_size += block_len;
517                        skip_raw_block_count += 1;
518                    }
519                    self.executor.may_report_process_key(block_key_count);
520                    self.executor.clear();
521                }
522                if !first.current_sstable().is_valid() {
523                    first.next_sstable().await?;
524                    continue;
525                }
526                first.init_block_iter().await?;
527            }
528
529            let target_key = second.current_sstable().key();
530            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
531            self.executor.run(iter, target_key).await?;
532            if !iter.is_valid() {
533                first.sstable_iter.as_mut().unwrap().iter.take();
534                if !first.current_sstable().is_valid() {
535                    first.next_sstable().await?;
536                }
537            }
538        }
539        let rest_data = if !self.left.is_valid() {
540            &mut self.right
541        } else {
542            &mut self.left
543        };
544        if rest_data.is_valid() {
545            // compact rest keys of the current block.
546            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
547            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
548            if let Some(iter) = sstable_iter.iter.as_mut() {
549                self.executor.run(iter, target_key).await?;
550                assert!(
551                    !iter.is_valid(),
552                    "iter should not be valid key {:?}",
553                    iter.key()
554                );
555            }
556            sstable_iter.iter.take();
557        }
558
559        while rest_data.is_valid() {
560            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
561            while sstable_iter.is_valid() {
562                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
563                let (block, filter_data, block_meta) =
564                    sstable_iter.download_next_block().await?.unwrap();
565                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
566                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
567                    && self.executor.last_key_is_delete;
568                if self.executor.builder.need_flush()
569                    || need_deleted
570                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
571                {
572                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
573                    let target_key = FullKey::decode(&largest_key);
574                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
575                    let mut iter = sstable_iter.iter.take().unwrap();
576                    self.executor.run(&mut iter, target_key).await?;
577                } else {
578                    let largest_key = sstable_iter.current_block_largest();
579                    let block_len = block.len() as u64;
580                    let block_key_count = block_meta.total_key_count;
581                    if self
582                        .executor
583                        .builder
584                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
585                        .await?
586                    {
587                        skip_raw_block_count += 1;
588                        skip_raw_block_size += block_len;
589                    }
590                    self.executor.may_report_process_key(block_key_count);
591                    self.executor.clear();
592                }
593            }
594            rest_data.next_sstable().await?;
595        }
596        let mut total_read_bytes = 0;
597        for sst in &self.left.sstables {
598            total_read_bytes += sst.sst_size;
599        }
600        for sst in &self.right.sstables {
601            total_read_bytes += sst.sst_size;
602        }
603        self.metrics
604            .compact_fast_runner_bytes
605            .inc_by(skip_raw_block_size);
606        tracing::info!(
607            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
608            skip_raw_block_count,
609            self.task_id,
610            skip_raw_block_size * 100 / total_read_bytes,
611        );
612
613        let statistic = self.executor.take_statistics();
614        let output_ssts = self.executor.builder.finish().await?;
615        Compactor::report_progress(
616            self.metrics.clone(),
617            Some(self.executor.task_progress.clone()),
618            &output_ssts,
619            false,
620        );
621        let sst_infos = output_ssts
622            .iter()
623            .map(|sst| sst.sst_info.clone())
624            .collect_vec();
625        assert!(can_concat(&sst_infos));
626        Ok((output_ssts, statistic))
627    }
628}
629
630pub struct CompactTaskExecutor<F: TableBuilderFactory> {
631    last_key: FullKey<Vec<u8>>,
632    compaction_statistics: CompactionStatistics,
633    last_table_id: Option<u32>,
634    last_table_stats: TableStats,
635    builder: CapacitySplitTableBuilder<F>,
636    task_config: TaskConfig,
637    task_progress: Arc<TaskProgress>,
638    skip_watermark_state: PkPrefixSkipWatermarkState,
639    last_key_is_delete: bool,
640    progress_key_num: u32,
641    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
642}
643
644impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
645    pub fn new(
646        builder: CapacitySplitTableBuilder<F>,
647        task_config: TaskConfig,
648        task_progress: Arc<TaskProgress>,
649        skip_watermark_state: PkPrefixSkipWatermarkState,
650        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
651    ) -> Self {
652        Self {
653            builder,
654            task_config,
655            last_key: FullKey::default(),
656            last_key_is_delete: false,
657            compaction_statistics: CompactionStatistics::default(),
658            last_table_id: None,
659            last_table_stats: TableStats::default(),
660            task_progress,
661            skip_watermark_state,
662            progress_key_num: 0,
663            non_pk_prefix_skip_watermark_state,
664        }
665    }
666
667    fn take_statistics(&mut self) -> CompactionStatistics {
668        if let Some(last_table_id) = self.last_table_id.take() {
669            self.compaction_statistics
670                .delta_drop_stat
671                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
672        }
673        std::mem::take(&mut self.compaction_statistics)
674    }
675
676    fn clear(&mut self) {
677        if !self.last_key.is_empty() {
678            self.last_key = FullKey::default();
679        }
680        self.last_key_is_delete = false;
681    }
682
683    #[inline(always)]
684    fn may_report_process_key(&mut self, key_count: u32) {
685        const PROGRESS_KEY_INTERVAL: u32 = 100;
686        self.progress_key_num += key_count;
687        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
688            self.task_progress
689                .inc_progress_key(self.progress_key_num as u64);
690            self.progress_key_num = 0;
691        }
692    }
693
694    pub async fn run(
695        &mut self,
696        iter: &mut BlockIterator,
697        target_key: FullKey<&[u8]>,
698    ) -> HummockResult<()> {
699        self.skip_watermark_state.reset_watermark();
700        self.non_pk_prefix_skip_watermark_state.reset_watermark();
701
702        while iter.is_valid() && iter.key().le(&target_key) {
703            let is_new_user_key =
704                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
705            self.compaction_statistics.iter_total_key_counts += 1;
706            self.may_report_process_key(1);
707
708            let mut drop = false;
709            let value = HummockValue::from_slice(iter.value()).unwrap();
710            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
711            if is_first_or_new_user_key {
712                self.last_key.set(iter.key());
713                self.last_key_is_delete = false;
714            }
715
716            // See note in `compactor_runner.rs`.
717            if !self.task_config.retain_multiple_version
718                && self.task_config.gc_delete_keys
719                && value.is_delete()
720            {
721                drop = true;
722                self.last_key_is_delete = true;
723            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
724                drop = true;
725            }
726
727            if self.watermark_should_delete(&iter.key()) {
728                drop = true;
729                self.last_key_is_delete = true;
730            }
731
732            if self.last_table_id != Some(self.last_key.user_key.table_id.table_id) {
733                if let Some(last_table_id) = self.last_table_id.take() {
734                    self.compaction_statistics
735                        .delta_drop_stat
736                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
737                }
738                self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
739            }
740
741            if drop {
742                self.compaction_statistics.iter_drop_key_counts += 1;
743
744                let should_count = match self.task_config.stats_target_table_ids.as_ref() {
745                    Some(target_table_ids) => {
746                        target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
747                    }
748                    None => true,
749                };
750                if should_count {
751                    self.last_table_stats.total_key_count -= 1;
752                    self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
753                    self.last_table_stats.total_value_size -= value.encoded_len() as i64;
754                }
755                iter.next();
756                continue;
757            }
758            self.builder
759                .add_full_key(iter.key(), value, is_new_user_key)
760                .await?;
761            iter.next();
762        }
763        Ok(())
764    }
765
766    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
767        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
768            // If the last key is delete tombstone, we can not append the origin block
769            // because it would cause a deleted key could be see by user again.
770            return false;
771        }
772
773        if self.watermark_should_delete(smallest_key) {
774            return false;
775        }
776
777        true
778    }
779
780    fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
781        (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
782            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
783                && self.non_pk_prefix_skip_watermark_state.should_delete(key))
784    }
785}