risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::task_progress::TaskProgress;
38use crate::hummock::compactor::{
39    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
40    RemoteBuilderFactory, TaskConfig,
41};
42use crate::hummock::iterator::{
43    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
44    ValueSkipWatermarkState,
45};
46use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
47use crate::hummock::sstable_store::SstableStoreRef;
48use crate::hummock::value::HummockValue;
49use crate::hummock::{
50    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
51    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
52    TableHolder, UnifiedSstableWriterFactory,
53};
54use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
55
56/// Iterates over the KV-pairs of an SST while downloading it.
57pub struct BlockStreamIterator {
58    /// The downloading stream.
59    block_stream: Option<BlockDataStream>,
60
61    next_block_index: usize,
62
63    /// For key sanity check of divided SST and debugging
64    sstable: TableHolder,
65    iter: Option<BlockIterator>,
66    task_progress: Arc<TaskProgress>,
67
68    // For block stream recreate
69    sstable_store: SstableStoreRef,
70    sstable_info: SstableInfo,
71    io_retry_times: usize,
72    max_io_retry_times: usize,
73    stats_ptr: Arc<AtomicU64>,
74}
75
76impl BlockStreamIterator {
77    // We have to handle two internal iterators.
78    //   `block_stream`: iterates over the blocks of the table.
79    //     `block_iter`: iterates over the KV-pairs of the current block.
80    // These iterators work in different ways.
81
82    // BlockIterator works as follows: After new(), we call seek(). That brings us
83    // to the first element. Calling next() then brings us to the second element and does not
84    // return anything.
85
86    // BlockStream follows a different approach. After new(), we do not seek, instead next()
87    // returns the first value.
88
89    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
90    /// The iterator reads at most `max_block_count` from the stream.
91    pub fn new(
92        sstable: TableHolder,
93        task_progress: Arc<TaskProgress>,
94        sstable_store: SstableStoreRef,
95        sstable_info: SstableInfo,
96        max_io_retry_times: usize,
97        stats_ptr: Arc<AtomicU64>,
98    ) -> Self {
99        Self {
100            block_stream: None,
101            next_block_index: 0,
102            sstable,
103            iter: None,
104            task_progress,
105            sstable_store,
106            sstable_info,
107            io_retry_times: 0,
108            max_io_retry_times,
109            stats_ptr,
110        }
111    }
112
113    async fn create_stream(&mut self) -> HummockResult<()> {
114        // Fast compaction streams the physical SST blocks directly. Table-id pruning is handled
115        // later by `CompactTaskExecutor` before raw block copy or decoded block compaction.
116        let block_stream = self
117            .sstable_store
118            .get_stream_for_blocks(
119                self.sstable_info.object_id,
120                &self.sstable.meta.block_metas[self.next_block_index..],
121            )
122            .instrument_await("stream_iter_get_stream".verbose())
123            .await?;
124        self.block_stream = Some(block_stream);
125        Ok(())
126    }
127
128    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
129    pub(crate) async fn download_next_block(
130        &mut self,
131    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
132        let now = Instant::now();
133        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
134            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
135            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
136        });
137        loop {
138            let ret = match &mut self.block_stream {
139                Some(block_stream) => block_stream.next_block_impl().await,
140                None => {
141                    self.create_stream().await?;
142                    continue;
143                }
144            };
145            match ret {
146                Ok(Some((data, _))) => {
147                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
148                    let filter_block = self
149                        .sstable
150                        .filter_reader
151                        .get_block_raw_filter(self.next_block_index);
152                    self.next_block_index += 1;
153                    return Ok(Some((data, filter_block, meta)));
154                }
155
156                Ok(None) => break,
157
158                Err(e) => {
159                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
160                        return Err(e);
161                    }
162
163                    self.block_stream.take();
164                    self.io_retry_times += 1;
165                    fail_point!("create_stream_err");
166
167                    tracing::warn!(
168                        "fast compact retry create stream for sstable {} times, sstinfo={}",
169                        self.io_retry_times,
170                        format!(
171                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
172                            self.sstable_info.object_id,
173                            self.sstable_info.sst_id,
174                            self.sstable_info.meta_offset,
175                            self.sstable_info.table_ids
176                        )
177                    );
178                }
179            }
180        }
181
182        self.next_block_index = self.sstable.meta.block_metas.len();
183        self.iter.take();
184        Ok(None)
185    }
186
187    pub(crate) fn init_block_iter(
188        &mut self,
189        buf: Bytes,
190        uncompressed_capacity: usize,
191    ) -> HummockResult<()> {
192        let block = Block::decode(buf, uncompressed_capacity)?;
193        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
194        iter.seek_to_first();
195        self.iter = Some(iter);
196        Ok(())
197    }
198
199    fn next_block_smallest(&self) -> &[u8] {
200        self.sstable.meta.block_metas[self.next_block_index]
201            .smallest_key
202            .as_ref()
203    }
204
205    fn next_block_largest(&self) -> &[u8] {
206        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
207            self.sstable.meta.block_metas[self.next_block_index + 1]
208                .smallest_key
209                .as_ref()
210        } else {
211            self.sstable.meta.largest_key.as_ref()
212        }
213    }
214
215    fn current_block_largest(&self) -> Vec<u8> {
216        if self.next_block_index < self.sstable.meta.block_metas.len() {
217            let mut largest_key = FullKey::decode(
218                self.sstable.meta.block_metas[self.next_block_index]
219                    .smallest_key
220                    .as_ref(),
221            );
222            // do not include this key because it is the smallest key of next block.
223            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
224            largest_key.encode()
225        } else {
226            self.sstable.meta.largest_key.clone()
227        }
228    }
229
230    fn key(&self) -> FullKey<&[u8]> {
231        match self.iter.as_ref() {
232            Some(iter) => iter.key(),
233            None => FullKey::decode(
234                self.sstable.meta.block_metas[self.next_block_index]
235                    .smallest_key
236                    .as_ref(),
237            ),
238        }
239    }
240
241    pub(crate) fn is_valid(&self) -> bool {
242        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
243    }
244
245    #[cfg(test)]
246    #[cfg(feature = "failpoints")]
247    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
248        self.iter.as_mut().unwrap()
249    }
250}
251
252impl Drop for BlockStreamIterator {
253    fn drop(&mut self) {
254        self.task_progress.dec_num_pending_read_io();
255    }
256}
257
258/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
259/// be consecutive and non-overlapping.
260pub struct ConcatSstableIterator {
261    /// The iterator of the current table.
262    sstable_iter: Option<BlockStreamIterator>,
263
264    /// Current table index.
265    cur_idx: usize,
266
267    /// All non-overlapping tables.
268    sstables: Vec<SstableInfo>,
269
270    sstable_store: SstableStoreRef,
271
272    stats: StoreLocalStatistic,
273    task_progress: Arc<TaskProgress>,
274
275    max_io_retry_times: usize,
276}
277
278impl ConcatSstableIterator {
279    /// Caller should make sure that `tables` are non-overlapping,
280    /// arranged in ascending order when it serves as a forward iterator,
281    /// and arranged in descending order when it serves as a backward iterator.
282    pub fn new(
283        sst_infos: Vec<SstableInfo>,
284        sstable_store: SstableStoreRef,
285        task_progress: Arc<TaskProgress>,
286        max_io_retry_times: usize,
287    ) -> Self {
288        Self {
289            sstable_iter: None,
290            cur_idx: 0,
291            sstables: sst_infos,
292            sstable_store,
293            task_progress,
294            stats: StoreLocalStatistic::default(),
295            max_io_retry_times,
296        }
297    }
298
299    pub async fn rewind(&mut self) -> HummockResult<()> {
300        self.seek_idx(0).await
301    }
302
303    pub async fn next_sstable(&mut self) -> HummockResult<()> {
304        self.seek_idx(self.cur_idx + 1).await
305    }
306
307    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
308        self.sstable_iter.as_mut().unwrap()
309    }
310
311    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
312        if let Some(sstable) = self.sstable_iter.as_mut() {
313            if sstable.iter.is_some() {
314                return Ok(());
315            }
316            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
317            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
318        }
319        Ok(())
320    }
321
322    pub fn is_valid(&self) -> bool {
323        self.cur_idx < self.sstables.len()
324    }
325
326    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
327    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
328        self.sstable_iter.take();
329        self.cur_idx = idx;
330        if self.cur_idx < self.sstables.len() {
331            let sstable_info = &self.sstables[self.cur_idx];
332            let sstable = self
333                .sstable_store
334                .sstable(sstable_info, &mut self.stats)
335                .instrument_await("stream_iter_sstable".verbose())
336                .await?;
337            self.task_progress.inc_num_pending_read_io();
338
339            let sstable_iter = BlockStreamIterator::new(
340                sstable,
341                self.task_progress.clone(),
342                self.sstable_store.clone(),
343                sstable_info.clone(),
344                self.max_io_retry_times,
345                self.stats.remote_io_time.clone(),
346            );
347            self.sstable_iter = Some(sstable_iter);
348        }
349        Ok(())
350    }
351}
352
353pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
354    left: Box<ConcatSstableIterator>,
355    right: Box<ConcatSstableIterator>,
356    task_id: u64,
357    executor: CompactTaskExecutor<
358        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
359        C,
360    >,
361    compression_algorithm: CompressionAlgorithm,
362    metrics: Arc<CompactorMetrics>,
363}
364
365impl<C: CompactionFilter> CompactorRunner<C> {
366    pub fn new(
367        context: CompactorContext,
368        task: CompactTask,
369        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
370        object_id_getter: Arc<dyn GetObjectId>,
371        task_progress: Arc<TaskProgress>,
372        compaction_filter: C,
373    ) -> Self {
374        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
375        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
376        options.compression_algorithm = compression_algorithm;
377        options.capacity = task.target_file_size as usize;
378        // Disable vnode key-range hints for fast compaction path by default.
379        options.max_vnode_key_range_bytes = None;
380        let get_id_time = Arc::new(AtomicU64::new(0));
381
382        debug_assert_eq!(
383            task.sstable_filter_kind,
384            PbSstableFilterType::SstableFilterXor16,
385            "fast compaction only supports blocked xor16 filter today"
386        );
387        debug_assert!(
388            task.should_use_block_based_filter(),
389            "fast compaction can only preserve blocked filters; expected blocked output"
390        );
391
392        let key_range = KeyRange::inf();
393        let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
394
395        let task_config = TaskConfig {
396            key_range,
397            cache_policy: CachePolicy::NotFill,
398            gc_delete_keys: task.gc_delete_keys,
399            retain_multiple_version: false,
400            table_vnode_partition: task.table_vnode_partition.clone(),
401            use_block_based_filter: true,
402            sstable_filter_kind: task.sstable_filter_kind,
403            table_schemas: Default::default(),
404            disable_drop_column_optimization: false,
405        };
406        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
407
408        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
409            object_id_getter,
410            limiter: context.memory_limiter.clone(),
411            options,
412            policy: task_config.cache_policy,
413            remote_rpc_cost: get_id_time,
414            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
415            sstable_writer_factory: factory,
416            _phantom: PhantomData,
417        };
418        let sst_builder = CapacitySplitTableBuilder::new(
419            builder_factory,
420            context.compactor_metrics.clone(),
421            Some(task_progress.clone()),
422            task_config.table_vnode_partition.clone(),
423            context
424                .storage_opts
425                .compactor_concurrent_uploading_sst_count,
426            compaction_catalog_agent_ref.clone(),
427        );
428        assert_eq!(
429            task.input_ssts.len(),
430            2,
431            "TaskId {} target_level {:?} task {:?}",
432            task.task_id,
433            task.target_level,
434            compact_task_to_string(&task)
435        );
436        let left_ssts = task.input_ssts[0]
437            .read_sstable_infos()
438            .cloned()
439            .collect_vec();
440        let right_ssts = task.input_ssts[1]
441            .read_sstable_infos()
442            .cloned()
443            .collect_vec();
444        assert!(
445            left_ssts
446                .iter()
447                .chain(right_ssts.iter())
448                .all(|sst| sst.bloom_filter_kind == BloomFilterType::Blocked),
449            "fast compaction requires blocked-filter SSTs: {}",
450            compact_task_to_string(&task)
451        );
452        let left = Box::new(ConcatSstableIterator::new(
453            left_ssts,
454            context.sstable_store.clone(),
455            task_progress.clone(),
456            context.storage_opts.compactor_iter_max_io_retry_times,
457        ));
458        let right = Box::new(ConcatSstableIterator::new(
459            right_ssts,
460            context.sstable_store,
461            task_progress.clone(),
462            context.storage_opts.compactor_iter_max_io_retry_times,
463        ));
464
465        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
466        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
467            task.pk_prefix_table_watermarks.clone(),
468        );
469        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
470            task.non_pk_prefix_table_watermarks.clone(),
471            compaction_catalog_agent_ref.clone(),
472        );
473        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
474            task.value_table_watermarks.clone(),
475            compaction_catalog_agent_ref,
476        );
477
478        Self {
479            executor: CompactTaskExecutor::new(
480                sst_builder,
481                task_config,
482                task_progress,
483                pk_prefix_state,
484                non_pk_prefix_state,
485                value_skip_watermark_state,
486                compaction_filter,
487                read_table_ids,
488            ),
489            left,
490            right,
491            task_id: task.task_id,
492            metrics: context.compactor_metrics,
493            compression_algorithm,
494        }
495    }
496
497    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
498        self.left.rewind().await?;
499        self.right.rewind().await?;
500        let mut skip_raw_block_count = 0;
501        let mut skip_raw_block_size = 0;
502        while self.left.is_valid() && self.right.is_valid() {
503            let ret = self
504                .left
505                .current_sstable()
506                .key()
507                .cmp(&self.right.current_sstable().key());
508            let (first, second) = if ret == Ordering::Less {
509                (&mut self.left, &mut self.right)
510            } else {
511                (&mut self.right, &mut self.left)
512            };
513            assert!(
514                ret != Ordering::Equal,
515                "sst range overlap equal_key {:?}",
516                self.left.current_sstable().key()
517            );
518            if first.current_sstable().iter.is_none() {
519                let right_key = second.current_sstable().key();
520                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
521                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
522                    // the full key may be either Excluded key or Included key, so we do not allow
523                    // they equals.
524                    if full_key.user_key.ge(&right_key.user_key) {
525                        break;
526                    }
527                    let smallest_key =
528                        FullKey::decode(first.current_sstable().next_block_smallest());
529                    if !self.executor.shall_copy_raw_block(&smallest_key) {
530                        break;
531                    }
532                    let smallest_key = smallest_key.to_vec();
533
534                    let (mut block, filter_data, mut meta) = first
535                        .current_sstable()
536                        .download_next_block()
537                        .await?
538                        .unwrap();
539                    let algorithm = Block::get_algorithm(&block)?;
540                    if algorithm == CompressionAlgorithm::None
541                        && algorithm != self.compression_algorithm
542                    {
543                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
544                        meta.len = block.len() as u32;
545                    }
546
547                    let largest_key = first.current_sstable().current_block_largest();
548                    let block_len = block.len() as u64;
549                    let block_key_count = meta.total_key_count;
550
551                    if self
552                        .executor
553                        .builder
554                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
555                        .await?
556                    {
557                        skip_raw_block_size += block_len;
558                        skip_raw_block_count += 1;
559                    }
560                    self.executor.may_report_process_key(block_key_count);
561                    self.executor.clear();
562                }
563                if !first.current_sstable().is_valid() {
564                    first.next_sstable().await?;
565                    continue;
566                }
567                first.init_block_iter().await?;
568            }
569
570            let target_key = second.current_sstable().key();
571            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
572            self.executor.reset_watermark();
573            self.executor.run(iter, target_key).await?;
574            if !iter.is_valid() {
575                first.sstable_iter.as_mut().unwrap().iter.take();
576                if !first.current_sstable().is_valid() {
577                    first.next_sstable().await?;
578                }
579            }
580        }
581        let rest_data = if !self.left.is_valid() {
582            &mut self.right
583        } else {
584            &mut self.left
585        };
586        if rest_data.is_valid() {
587            // compact rest keys of the current block.
588            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
589            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
590            if let Some(iter) = sstable_iter.iter.as_mut() {
591                self.executor.reset_watermark();
592                self.executor.run(iter, target_key).await?;
593                assert!(
594                    !iter.is_valid(),
595                    "iter should not be valid key {:?}",
596                    iter.key()
597                );
598            }
599            sstable_iter.iter.take();
600        }
601
602        while rest_data.is_valid() {
603            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
604            while sstable_iter.is_valid() {
605                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
606                let (block, filter_data, block_meta) =
607                    sstable_iter.download_next_block().await?.unwrap();
608                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
609                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
610                    && self.executor.last_key_is_delete;
611                if self.executor.builder.need_flush()
612                    || need_deleted
613                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
614                {
615                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
616                    let target_key = FullKey::decode(&largest_key);
617                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
618                    let mut iter = sstable_iter.iter.take().unwrap();
619                    self.executor.reset_watermark();
620                    self.executor.run(&mut iter, target_key).await?;
621                } else {
622                    let largest_key = sstable_iter.current_block_largest();
623                    let block_len = block.len() as u64;
624                    let block_key_count = block_meta.total_key_count;
625                    if self
626                        .executor
627                        .builder
628                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
629                        .await?
630                    {
631                        skip_raw_block_count += 1;
632                        skip_raw_block_size += block_len;
633                    }
634                    self.executor.may_report_process_key(block_key_count);
635                    self.executor.clear();
636                }
637            }
638            rest_data.next_sstable().await?;
639        }
640        let mut total_read_bytes = 0;
641        for sst in &self.left.sstables {
642            total_read_bytes += sst.sst_size;
643        }
644        for sst in &self.right.sstables {
645            total_read_bytes += sst.sst_size;
646        }
647        self.metrics
648            .compact_fast_runner_bytes
649            .inc_by(skip_raw_block_size);
650        tracing::info!(
651            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
652            skip_raw_block_count,
653            self.task_id,
654            skip_raw_block_size * 100 / total_read_bytes,
655        );
656
657        let statistic = self.executor.take_statistics();
658        let output_ssts = self.executor.builder.finish().await?;
659        Compactor::report_progress(
660            self.metrics.clone(),
661            Some(self.executor.task_progress.clone()),
662            &output_ssts,
663            false,
664        );
665        let sst_infos = output_ssts
666            .iter()
667            .map(|sst| sst.sst_info.clone())
668            .collect_vec();
669        assert!(can_concat(&sst_infos));
670        Ok((output_ssts, statistic))
671    }
672}
673
674pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
675    last_key: FullKey<Vec<u8>>,
676    compaction_statistics: CompactionStatistics,
677    last_table_id: Option<TableId>,
678    last_table_stats: TableStats,
679    builder: CapacitySplitTableBuilder<F>,
680    task_config: TaskConfig,
681    task_progress: Arc<TaskProgress>,
682    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
683    last_key_is_delete: bool,
684    progress_key_num: u32,
685    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
686    value_skip_watermark_state: ValueSkipWatermarkState,
687    compaction_filter: C,
688    read_table_ids: HashSet<TableId>,
689}
690
691impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
692    pub fn new(
693        builder: CapacitySplitTableBuilder<F>,
694        task_config: TaskConfig,
695        task_progress: Arc<TaskProgress>,
696        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
697        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
698        value_skip_watermark_state: ValueSkipWatermarkState,
699        compaction_filter: C,
700        read_table_ids: HashSet<TableId>,
701    ) -> Self {
702        Self {
703            builder,
704            task_config,
705            last_key: FullKey::default(),
706            last_key_is_delete: false,
707            compaction_statistics: CompactionStatistics::default(),
708            last_table_id: None,
709            last_table_stats: TableStats::default(),
710            task_progress,
711            pk_prefix_skip_watermark_state,
712            progress_key_num: 0,
713            non_pk_prefix_skip_watermark_state,
714            value_skip_watermark_state,
715            compaction_filter,
716            read_table_ids,
717        }
718    }
719
720    fn take_statistics(&mut self) -> CompactionStatistics {
721        if let Some(last_table_id) = self.last_table_id.take() {
722            self.compaction_statistics
723                .delta_drop_stat
724                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
725        }
726        std::mem::take(&mut self.compaction_statistics)
727    }
728
729    fn clear(&mut self) {
730        if !self.last_key.is_empty() {
731            self.last_key = FullKey::default();
732        }
733        self.last_key_is_delete = false;
734    }
735
736    fn reset_watermark(&mut self) {
737        self.pk_prefix_skip_watermark_state.reset_watermark();
738        self.non_pk_prefix_skip_watermark_state.reset_watermark();
739        self.value_skip_watermark_state.reset_watermark();
740    }
741
742    #[inline(always)]
743    fn should_skip_block(&self, table_id: TableId) -> bool {
744        !self.read_table_ids.contains(&table_id)
745    }
746
747    #[inline(always)]
748    fn may_report_process_key(&mut self, key_count: u32) {
749        const PROGRESS_KEY_INTERVAL: u32 = 100;
750        self.progress_key_num += key_count;
751        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
752            self.task_progress
753                .inc_progress_key(self.progress_key_num as u64);
754            self.progress_key_num = 0;
755        }
756    }
757
758    pub async fn run(
759        &mut self,
760        iter: &mut BlockIterator,
761        target_key: FullKey<&[u8]>,
762    ) -> HummockResult<()> {
763        if self.should_skip_block(iter.table_id()) {
764            iter.finish_block();
765            return Ok(());
766        }
767
768        while iter.is_valid() && iter.key().le(&target_key) {
769            let is_new_user_key =
770                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
771            self.compaction_statistics.iter_total_key_counts += 1;
772            self.may_report_process_key(1);
773
774            let mut drop = false;
775            let value = HummockValue::from_slice(iter.value()).unwrap();
776            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
777            if is_first_or_new_user_key {
778                self.last_key.set(iter.key());
779                self.last_key_is_delete = false;
780            }
781
782            // See note in `compactor_runner.rs`.
783            if !self.task_config.retain_multiple_version
784                && self.task_config.gc_delete_keys
785                && value.is_delete()
786            {
787                drop = true;
788                self.last_key_is_delete = true;
789            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
790                drop = true;
791            }
792
793            if !drop && self.compaction_filter.should_delete(iter.key()) {
794                drop = true;
795            }
796
797            if !drop && self.watermark_should_delete(&iter.key(), value) {
798                drop = true;
799                self.last_key_is_delete = true;
800            }
801
802            if self.last_table_id != Some(self.last_key.user_key.table_id) {
803                if let Some(last_table_id) = self.last_table_id.take() {
804                    self.compaction_statistics
805                        .delta_drop_stat
806                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
807                }
808                self.last_table_id = Some(self.last_key.user_key.table_id);
809            }
810
811            if drop {
812                self.compaction_statistics.iter_drop_key_counts += 1;
813
814                self.last_table_stats.total_key_count -= 1;
815                self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
816                self.last_table_stats.total_value_size -= value.encoded_len() as i64;
817                iter.next();
818                continue;
819            }
820            self.builder
821                .add_full_key(iter.key(), value, is_new_user_key)
822                .await?;
823            iter.next();
824        }
825        Ok(())
826    }
827
828    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
829        if self.should_skip_block(smallest_key.user_key.table_id) {
830            // If the table id of smallest key is not in read_table_ids, we can not copy the raw block.
831            return false;
832        }
833
834        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
835            // If the last key is delete tombstone, we can not append the origin block
836            // because it would cause a deleted key could be see by user again.
837            return false;
838        }
839
840        if self.watermark_may_delete(smallest_key) {
841            return false;
842        }
843
844        // Check compaction filter
845        if self.compaction_filter.should_delete(*smallest_key) {
846            return false;
847        }
848
849        true
850    }
851
852    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
853        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
854        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
855        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
856        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
857            let unused = vec![];
858            let unused_put = HummockValue::Put(unused.as_slice());
859            if (pk_prefix_has_watermark
860                && self
861                    .pk_prefix_skip_watermark_state
862                    .should_delete(key, unused_put))
863                || (non_pk_prefix_has_watermark
864                    && self
865                        .non_pk_prefix_skip_watermark_state
866                        .should_delete(key, unused_put))
867            {
868                return true;
869            }
870        }
871        self.value_skip_watermark_state.has_watermark()
872            && self.value_skip_watermark_state.may_delete(key)
873    }
874
875    fn watermark_should_delete(
876        &mut self,
877        key: &FullKey<&[u8]>,
878        value: HummockValue<&[u8]>,
879    ) -> bool {
880        (self.pk_prefix_skip_watermark_state.has_watermark()
881            && self
882                .pk_prefix_skip_watermark_state
883                .should_delete(key, value))
884            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
885                && self
886                    .non_pk_prefix_skip_watermark_state
887                    .should_delete(key, value))
888            || (self.value_skip_watermark_state.has_watermark()
889                && self.value_skip_watermark_state.should_delete(key, value))
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use std::collections::{HashMap, VecDeque};
896    use std::sync::Arc;
897
898    use risingwave_common::catalog::TableId;
899    use risingwave_common::hash::VirtualNode;
900    use risingwave_common::util::epoch::test_epoch;
901    use risingwave_hummock_sdk::compact_task::CompactTask;
902    use risingwave_hummock_sdk::key::FullKey;
903    use risingwave_hummock_sdk::level::InputLevel;
904    use risingwave_pb::hummock::compact_task::TaskType;
905    use risingwave_pb::hummock::{BloomFilterType, LevelType, PbSstableFilterType};
906
907    use super::CompactorRunner;
908    use crate::compaction_catalog_manager::CompactionCatalogAgent;
909    use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
910    use crate::hummock::compactor::task_progress::TaskProgress;
911    use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
912    use crate::hummock::iterator::test_utils::mock_sstable_store;
913    use crate::hummock::test_utils::{
914        default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
915    };
916    use crate::hummock::value::HummockValue;
917    use crate::hummock::{
918        BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
919    };
920    use crate::monitor::CompactorMetrics;
921
922    fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
923        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
924        table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
925        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
926    }
927
928    #[tokio::test]
929    async fn test_fast_compact_skips_empty_table_id_sst() {
930        let sstable_store = mock_sstable_store().await;
931        let table_id_to_vnode = HashMap::from([
932            (1, VirtualNode::COUNT_FOR_TEST),
933            (2, VirtualNode::COUNT_FOR_TEST),
934        ]);
935        let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
936
937        let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
938            default_builder_opt_for_test(),
939            1,
940            (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
941            sstable_store.clone(),
942            CachePolicy::NotFill,
943            table_id_to_vnode.clone(),
944            table_id_to_watermark_serde.clone(),
945        )
946        .await;
947        assert_eq!(dropped_only_sst.bloom_filter_kind, BloomFilterType::Sstable);
948        let mut inner = dropped_only_sst.get_inner();
949        inner.table_ids.clear();
950        dropped_only_sst.set_inner(inner);
951
952        let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
953            default_builder_opt_for_test(),
954            2,
955            (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
956            sstable_store.clone(),
957            CachePolicy::NotFill,
958            table_id_to_vnode.clone(),
959            table_id_to_watermark_serde.clone(),
960        )
961        .await;
962        let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
963            default_builder_opt_for_test(),
964            3,
965            (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
966            sstable_store.clone(),
967            CachePolicy::NotFill,
968            table_id_to_vnode,
969            table_id_to_watermark_serde,
970        )
971        .await;
972
973        let mut storage_opts = default_opts_for_test();
974        storage_opts.enable_fast_compaction = true;
975        storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
976        storage_opts.compactor_fast_max_compact_delete_ratio = 100;
977        let context = CompactorContext::new_local_compact_context(
978            Arc::new(storage_opts),
979            sstable_store,
980            Arc::new(CompactorMetrics::unused()),
981            None,
982        );
983
984        let task = CompactTask {
985            input_ssts: vec![
986                InputLevel {
987                    level_idx: 1,
988                    level_type: LevelType::Nonoverlapping,
989                    table_infos: vec![dropped_only_sst, live_left_sst],
990                },
991                InputLevel {
992                    level_idx: 2,
993                    level_type: LevelType::Nonoverlapping,
994                    table_infos: vec![live_right_sst],
995                },
996            ],
997            task_id: 42,
998            target_level: 2,
999            existing_table_ids: vec![TableId::new(2)],
1000            target_file_size: 1 << 20,
1001            task_type: TaskType::Dynamic,
1002            blocked_xor_filter_kv_count_threshold: Some(0),
1003            sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
1004            ..Default::default()
1005        };
1006
1007        assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
1008        assert!(optimize_by_copy_block(&task, &context));
1009
1010        let runner = CompactorRunner::new(
1011            context,
1012            task,
1013            CompactionCatalogAgent::for_test(vec![1, 2]),
1014            SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1015            Arc::new(TaskProgress::default()),
1016            MultiCompactionFilter::default(),
1017        );
1018        runner.run().await.unwrap();
1019    }
1020}