risingwave_storage/hummock/compactor/
fast_compactor_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::compaction_utils::{
38    blocked_xor_filter_key_count_threshold, estimate_output_key_count_for_task,
39};
40use crate::hummock::compactor::task_progress::TaskProgress;
41use crate::hummock::compactor::{
42    CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
43    RemoteBuilderFactory, TaskConfig,
44};
45use crate::hummock::iterator::{
46    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
47    ValueSkipWatermarkState,
48};
49use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
50use crate::hummock::sstable_store::SstableStoreRef;
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53    Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
54    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
55    TableHolder, UnifiedSstableWriterFactory,
56};
57use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
58
59/// Iterates over the KV-pairs of an SST while downloading it.
60pub struct BlockStreamIterator {
61    /// The downloading stream.
62    block_stream: Option<BlockDataStream>,
63
64    next_block_index: usize,
65
66    /// For key sanity check of divided SST and debugging
67    sstable: TableHolder,
68    iter: Option<BlockIterator>,
69    task_progress: Arc<TaskProgress>,
70
71    // For block stream recreate
72    sstable_store: SstableStoreRef,
73    sstable_info: SstableInfo,
74    io_retry_times: usize,
75    max_io_retry_times: usize,
76    stats_ptr: Arc<AtomicU64>,
77}
78
79impl BlockStreamIterator {
80    // We have to handle two internal iterators.
81    //   `block_stream`: iterates over the blocks of the table.
82    //     `block_iter`: iterates over the KV-pairs of the current block.
83    // These iterators work in different ways.
84
85    // BlockIterator works as follows: After new(), we call seek(). That brings us
86    // to the first element. Calling next() then brings us to the second element and does not
87    // return anything.
88
89    // BlockStream follows a different approach. After new(), we do not seek, instead next()
90    // returns the first value.
91
92    /// Initialises a new [`BlockStreamIterator`] which iterates over the given [`BlockDataStream`].
93    /// The iterator reads at most `max_block_count` from the stream.
94    pub fn new(
95        sstable: TableHolder,
96        task_progress: Arc<TaskProgress>,
97        sstable_store: SstableStoreRef,
98        sstable_info: SstableInfo,
99        max_io_retry_times: usize,
100        stats_ptr: Arc<AtomicU64>,
101    ) -> Self {
102        Self {
103            block_stream: None,
104            next_block_index: 0,
105            sstable,
106            iter: None,
107            task_progress,
108            sstable_store,
109            sstable_info,
110            io_retry_times: 0,
111            max_io_retry_times,
112            stats_ptr,
113        }
114    }
115
116    async fn create_stream(&mut self) -> HummockResult<()> {
117        // Fast compaction streams the physical SST blocks directly. Table-id pruning is handled
118        // later by `CompactTaskExecutor` before raw block copy or decoded block compaction.
119        let block_stream = self
120            .sstable_store
121            .get_stream_for_blocks(
122                self.sstable_info.object_id,
123                &self.sstable.meta.block_metas[self.next_block_index..],
124            )
125            .instrument_await("stream_iter_get_stream".verbose())
126            .await?;
127        self.block_stream = Some(block_stream);
128        Ok(())
129    }
130
131    /// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
132    pub(crate) async fn download_next_block(
133        &mut self,
134    ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
135        let now = Instant::now();
136        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
137            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
138            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
139        });
140        loop {
141            let ret = match &mut self.block_stream {
142                Some(block_stream) => block_stream.next_block_impl().await,
143                None => {
144                    self.create_stream().await?;
145                    continue;
146                }
147            };
148            match ret {
149                Ok(Some((data, _))) => {
150                    let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
151                    let filter_block = self
152                        .sstable
153                        .filter_reader
154                        .get_block_raw_filter(self.next_block_index);
155                    self.next_block_index += 1;
156                    return Ok(Some((data, filter_block, meta)));
157                }
158
159                Ok(None) => break,
160
161                Err(e) => {
162                    if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
163                        return Err(e);
164                    }
165
166                    self.block_stream.take();
167                    self.io_retry_times += 1;
168                    fail_point!("create_stream_err");
169
170                    tracing::warn!(
171                        "fast compact retry create stream for sstable {} times, sstinfo={}",
172                        self.io_retry_times,
173                        format!(
174                            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
175                            self.sstable_info.object_id,
176                            self.sstable_info.sst_id,
177                            self.sstable_info.meta_offset,
178                            self.sstable_info.table_ids
179                        )
180                    );
181                }
182            }
183        }
184
185        self.next_block_index = self.sstable.meta.block_metas.len();
186        self.iter.take();
187        Ok(None)
188    }
189
190    pub(crate) fn init_block_iter(
191        &mut self,
192        buf: Bytes,
193        uncompressed_capacity: usize,
194    ) -> HummockResult<()> {
195        let block = Block::decode(buf, uncompressed_capacity)?;
196        let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
197        iter.seek_to_first();
198        self.iter = Some(iter);
199        Ok(())
200    }
201
202    fn next_block_smallest(&self) -> &[u8] {
203        self.sstable.meta.block_metas[self.next_block_index]
204            .smallest_key
205            .as_ref()
206    }
207
208    fn next_block_largest(&self) -> &[u8] {
209        if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
210            self.sstable.meta.block_metas[self.next_block_index + 1]
211                .smallest_key
212                .as_ref()
213        } else {
214            self.sstable.meta.largest_key.as_ref()
215        }
216    }
217
218    fn current_block_largest(&self) -> Vec<u8> {
219        if self.next_block_index < self.sstable.meta.block_metas.len() {
220            let mut largest_key = FullKey::decode(
221                self.sstable.meta.block_metas[self.next_block_index]
222                    .smallest_key
223                    .as_ref(),
224            );
225            // do not include this key because it is the smallest key of next block.
226            largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
227            largest_key.encode()
228        } else {
229            self.sstable.meta.largest_key.clone()
230        }
231    }
232
233    fn key(&self) -> FullKey<&[u8]> {
234        match self.iter.as_ref() {
235            Some(iter) => iter.key(),
236            None => FullKey::decode(
237                self.sstable.meta.block_metas[self.next_block_index]
238                    .smallest_key
239                    .as_ref(),
240            ),
241        }
242    }
243
244    pub(crate) fn is_valid(&self) -> bool {
245        self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
246    }
247
248    #[cfg(test)]
249    #[cfg(feature = "failpoints")]
250    pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
251        self.iter.as_mut().unwrap()
252    }
253}
254
255impl Drop for BlockStreamIterator {
256    fn drop(&mut self) {
257        self.task_progress.dec_num_pending_read_io();
258    }
259}
260
261/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
262/// be consecutive and non-overlapping.
263pub struct ConcatSstableIterator {
264    /// The iterator of the current table.
265    sstable_iter: Option<BlockStreamIterator>,
266
267    /// Current table index.
268    cur_idx: usize,
269
270    /// All non-overlapping tables.
271    sstables: Vec<SstableInfo>,
272
273    sstable_store: SstableStoreRef,
274
275    stats: StoreLocalStatistic,
276    task_progress: Arc<TaskProgress>,
277
278    max_io_retry_times: usize,
279}
280
281impl ConcatSstableIterator {
282    /// Caller should make sure that `tables` are non-overlapping,
283    /// arranged in ascending order when it serves as a forward iterator,
284    /// and arranged in descending order when it serves as a backward iterator.
285    pub fn new(
286        sst_infos: Vec<SstableInfo>,
287        sstable_store: SstableStoreRef,
288        task_progress: Arc<TaskProgress>,
289        max_io_retry_times: usize,
290    ) -> Self {
291        Self {
292            sstable_iter: None,
293            cur_idx: 0,
294            sstables: sst_infos,
295            sstable_store,
296            task_progress,
297            stats: StoreLocalStatistic::default(),
298            max_io_retry_times,
299        }
300    }
301
302    pub async fn rewind(&mut self) -> HummockResult<()> {
303        self.seek_idx(0).await
304    }
305
306    pub async fn next_sstable(&mut self) -> HummockResult<()> {
307        self.seek_idx(self.cur_idx + 1).await
308    }
309
310    pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
311        self.sstable_iter.as_mut().unwrap()
312    }
313
314    pub async fn init_block_iter(&mut self) -> HummockResult<()> {
315        if let Some(sstable) = self.sstable_iter.as_mut() {
316            if sstable.iter.is_some() {
317                return Ok(());
318            }
319            let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
320            sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
321        }
322        Ok(())
323    }
324
325    pub fn is_valid(&self) -> bool {
326        self.cur_idx < self.sstables.len()
327    }
328
329    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
330    async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
331        self.sstable_iter.take();
332        self.cur_idx = idx;
333        if self.cur_idx < self.sstables.len() {
334            let sstable_info = &self.sstables[self.cur_idx];
335            let sstable = self
336                .sstable_store
337                .sstable(sstable_info, &mut self.stats)
338                .instrument_await("stream_iter_sstable".verbose())
339                .await?;
340            self.task_progress.inc_num_pending_read_io();
341
342            let sstable_iter = BlockStreamIterator::new(
343                sstable,
344                self.task_progress.clone(),
345                self.sstable_store.clone(),
346                sstable_info.clone(),
347                self.max_io_retry_times,
348                self.stats.remote_io_time.clone(),
349            );
350            self.sstable_iter = Some(sstable_iter);
351        }
352        Ok(())
353    }
354}
355
356pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
357    left: Box<ConcatSstableIterator>,
358    right: Box<ConcatSstableIterator>,
359    task_id: u64,
360    executor: CompactTaskExecutor<
361        RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
362        C,
363    >,
364    compression_algorithm: CompressionAlgorithm,
365    metrics: Arc<CompactorMetrics>,
366}
367
368impl<C: CompactionFilter> CompactorRunner<C> {
369    pub fn new(
370        context: CompactorContext,
371        task: CompactTask,
372        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
373        object_id_getter: Arc<dyn GetObjectId>,
374        task_progress: Arc<TaskProgress>,
375        compaction_filter: C,
376    ) -> Self {
377        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
378        let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
379        options.compression_algorithm = compression_algorithm;
380        options.capacity = task.target_file_size as usize;
381        let estimated_output_key_count =
382            estimate_output_key_count_for_task(&task, options.capacity);
383        options.estimated_output_key_count = Some(estimated_output_key_count);
384        options.filter_hash_prealloc_key_count_cap =
385            blocked_xor_filter_key_count_threshold(task.blocked_xor_filter_kv_count_threshold);
386        // Disable vnode key-range hints for fast compaction path by default.
387        options.max_vnode_key_range_bytes = None;
388        let get_id_time = Arc::new(AtomicU64::new(0));
389
390        debug_assert_eq!(
391            task.sstable_filter_kind,
392            PbSstableFilterType::SstableFilterXor16,
393            "fast compaction only supports blocked xor16 filter today"
394        );
395        debug_assert!(
396            task.should_use_block_based_filter_for_output(estimated_output_key_count as u64),
397            "fast compaction can only preserve blocked filters; expected blocked output"
398        );
399
400        let key_range = KeyRange::inf();
401        let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
402
403        let task_config = TaskConfig {
404            key_range,
405            cache_policy: CachePolicy::NotFill,
406            gc_delete_keys: task.gc_delete_keys,
407            retain_multiple_version: false,
408            table_vnode_partition: task.table_vnode_partition.clone(),
409            use_block_based_filter: true,
410            sstable_filter_kind: task.sstable_filter_kind,
411            table_schemas: Default::default(),
412            disable_drop_column_optimization: false,
413        };
414        let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
415
416        let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
417            object_id_getter,
418            limiter: context.memory_limiter.clone(),
419            options,
420            policy: task_config.cache_policy,
421            remote_rpc_cost: get_id_time,
422            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
423            sstable_writer_factory: factory,
424            _phantom: PhantomData,
425        };
426        let sst_builder = CapacitySplitTableBuilder::new(
427            builder_factory,
428            context.compactor_metrics.clone(),
429            Some(task_progress.clone()),
430            task_config.table_vnode_partition.clone(),
431            context
432                .storage_opts
433                .compactor_concurrent_uploading_sst_count,
434            compaction_catalog_agent_ref.clone(),
435        );
436        assert_eq!(
437            task.input_ssts.len(),
438            2,
439            "TaskId {} target_level {:?} task {:?}",
440            task.task_id,
441            task.target_level,
442            compact_task_to_string(&task)
443        );
444        let left_ssts = task.input_ssts[0]
445            .read_sstable_infos()
446            .cloned()
447            .collect_vec();
448        let right_ssts = task.input_ssts[1]
449            .read_sstable_infos()
450            .cloned()
451            .collect_vec();
452        assert!(
453            left_ssts
454                .iter()
455                .chain(right_ssts.iter())
456                .all(|sst| sst.bloom_filter_kind == BloomFilterType::Blocked),
457            "fast compaction requires blocked-filter SSTs: {}",
458            compact_task_to_string(&task)
459        );
460        let left = Box::new(ConcatSstableIterator::new(
461            left_ssts,
462            context.sstable_store.clone(),
463            task_progress.clone(),
464            context.storage_opts.compactor_iter_max_io_retry_times,
465        ));
466        let right = Box::new(ConcatSstableIterator::new(
467            right_ssts,
468            context.sstable_store,
469            task_progress.clone(),
470            context.storage_opts.compactor_iter_max_io_retry_times,
471        ));
472
473        // Can not consume the watermarks because the watermarks may be used by `check_compact_result`.
474        let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
475            task.pk_prefix_table_watermarks.clone(),
476        );
477        let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
478            task.non_pk_prefix_table_watermarks.clone(),
479            compaction_catalog_agent_ref.clone(),
480        );
481        let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
482            task.value_table_watermarks.clone(),
483            compaction_catalog_agent_ref,
484        );
485
486        Self {
487            executor: CompactTaskExecutor::new(
488                sst_builder,
489                task_config,
490                task_progress,
491                pk_prefix_state,
492                non_pk_prefix_state,
493                value_skip_watermark_state,
494                compaction_filter,
495                read_table_ids,
496            ),
497            left,
498            right,
499            task_id: task.task_id,
500            metrics: context.compactor_metrics,
501            compression_algorithm,
502        }
503    }
504
505    pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
506        self.left.rewind().await?;
507        self.right.rewind().await?;
508        let mut skip_raw_block_count = 0;
509        let mut skip_raw_block_size = 0;
510        while self.left.is_valid() && self.right.is_valid() {
511            let ret = self
512                .left
513                .current_sstable()
514                .key()
515                .cmp(&self.right.current_sstable().key());
516            let (first, second) = if ret == Ordering::Less {
517                (&mut self.left, &mut self.right)
518            } else {
519                (&mut self.right, &mut self.left)
520            };
521            assert!(
522                ret != Ordering::Equal,
523                "sst range overlap equal_key {:?}",
524                self.left.current_sstable().key()
525            );
526            if first.current_sstable().iter.is_none() {
527                let right_key = second.current_sstable().key();
528                while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
529                    let full_key = FullKey::decode(first.current_sstable().next_block_largest());
530                    // the full key may be either Excluded key or Included key, so we do not allow
531                    // they equals.
532                    if full_key.user_key.ge(&right_key.user_key) {
533                        break;
534                    }
535                    let smallest_key =
536                        FullKey::decode(first.current_sstable().next_block_smallest());
537                    if !self.executor.shall_copy_raw_block(&smallest_key) {
538                        break;
539                    }
540                    let smallest_key = smallest_key.to_vec();
541
542                    let (mut block, filter_data, mut meta) = first
543                        .current_sstable()
544                        .download_next_block()
545                        .await?
546                        .unwrap();
547                    let algorithm = Block::get_algorithm(&block)?;
548                    if algorithm == CompressionAlgorithm::None
549                        && algorithm != self.compression_algorithm
550                    {
551                        block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
552                        meta.len = block.len() as u32;
553                    }
554
555                    let largest_key = first.current_sstable().current_block_largest();
556                    let block_len = block.len() as u64;
557                    let block_key_count = meta.total_key_count;
558
559                    if self
560                        .executor
561                        .builder
562                        .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
563                        .await?
564                    {
565                        skip_raw_block_size += block_len;
566                        skip_raw_block_count += 1;
567                    }
568                    self.executor.may_report_process_key(block_key_count);
569                    self.executor.clear();
570                }
571                if !first.current_sstable().is_valid() {
572                    first.next_sstable().await?;
573                    continue;
574                }
575                first.init_block_iter().await?;
576            }
577
578            let target_key = second.current_sstable().key();
579            let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
580            self.executor.reset_watermark();
581            self.executor.run(iter, target_key).await?;
582            if !iter.is_valid() {
583                first.sstable_iter.as_mut().unwrap().iter.take();
584                if !first.current_sstable().is_valid() {
585                    first.next_sstable().await?;
586                }
587            }
588        }
589        let rest_data = if !self.left.is_valid() {
590            &mut self.right
591        } else {
592            &mut self.left
593        };
594        if rest_data.is_valid() {
595            // compact rest keys of the current block.
596            let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
597            let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
598            if let Some(iter) = sstable_iter.iter.as_mut() {
599                self.executor.reset_watermark();
600                self.executor.run(iter, target_key).await?;
601                assert!(
602                    !iter.is_valid(),
603                    "iter should not be valid key {:?}",
604                    iter.key()
605                );
606            }
607            sstable_iter.iter.take();
608        }
609
610        while rest_data.is_valid() {
611            let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
612            while sstable_iter.is_valid() {
613                let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
614                let (block, filter_data, block_meta) =
615                    sstable_iter.download_next_block().await?.unwrap();
616                // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
617                let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
618                    && self.executor.last_key_is_delete;
619                if self.executor.builder.need_flush()
620                    || need_deleted
621                    || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
622                {
623                    let largest_key = sstable_iter.sstable.meta.largest_key.clone();
624                    let target_key = FullKey::decode(&largest_key);
625                    sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
626                    let mut iter = sstable_iter.iter.take().unwrap();
627                    self.executor.reset_watermark();
628                    self.executor.run(&mut iter, target_key).await?;
629                } else {
630                    let largest_key = sstable_iter.current_block_largest();
631                    let block_len = block.len() as u64;
632                    let block_key_count = block_meta.total_key_count;
633                    if self
634                        .executor
635                        .builder
636                        .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
637                        .await?
638                    {
639                        skip_raw_block_count += 1;
640                        skip_raw_block_size += block_len;
641                    }
642                    self.executor.may_report_process_key(block_key_count);
643                    self.executor.clear();
644                }
645            }
646            rest_data.next_sstable().await?;
647        }
648        let mut total_read_bytes = 0;
649        for sst in &self.left.sstables {
650            total_read_bytes += sst.sst_size;
651        }
652        for sst in &self.right.sstables {
653            total_read_bytes += sst.sst_size;
654        }
655        self.metrics
656            .compact_fast_runner_bytes
657            .inc_by(skip_raw_block_size);
658        tracing::info!(
659            "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
660            skip_raw_block_count,
661            self.task_id,
662            skip_raw_block_size * 100 / total_read_bytes,
663        );
664
665        let statistic = self.executor.take_statistics();
666        let output_ssts = self.executor.builder.finish().await?;
667        Compactor::report_progress(
668            self.metrics.clone(),
669            Some(self.executor.task_progress.clone()),
670            &output_ssts,
671            false,
672        );
673        let sst_infos = output_ssts
674            .iter()
675            .map(|sst| sst.sst_info.clone())
676            .collect_vec();
677        assert!(can_concat(&sst_infos));
678        Ok((output_ssts, statistic))
679    }
680}
681
682pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
683    last_key: FullKey<Vec<u8>>,
684    compaction_statistics: CompactionStatistics,
685    last_table_id: Option<TableId>,
686    last_table_stats: TableStats,
687    builder: CapacitySplitTableBuilder<F>,
688    task_config: TaskConfig,
689    task_progress: Arc<TaskProgress>,
690    pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
691    last_key_is_delete: bool,
692    progress_key_num: u32,
693    non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
694    value_skip_watermark_state: ValueSkipWatermarkState,
695    compaction_filter: C,
696    read_table_ids: HashSet<TableId>,
697}
698
699impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
700    pub fn new(
701        builder: CapacitySplitTableBuilder<F>,
702        task_config: TaskConfig,
703        task_progress: Arc<TaskProgress>,
704        pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
705        non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
706        value_skip_watermark_state: ValueSkipWatermarkState,
707        compaction_filter: C,
708        read_table_ids: HashSet<TableId>,
709    ) -> Self {
710        Self {
711            builder,
712            task_config,
713            last_key: FullKey::default(),
714            last_key_is_delete: false,
715            compaction_statistics: CompactionStatistics::default(),
716            last_table_id: None,
717            last_table_stats: TableStats::default(),
718            task_progress,
719            pk_prefix_skip_watermark_state,
720            progress_key_num: 0,
721            non_pk_prefix_skip_watermark_state,
722            value_skip_watermark_state,
723            compaction_filter,
724            read_table_ids,
725        }
726    }
727
728    fn take_statistics(&mut self) -> CompactionStatistics {
729        if let Some(last_table_id) = self.last_table_id.take() {
730            self.compaction_statistics
731                .delta_drop_stat
732                .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
733        }
734        std::mem::take(&mut self.compaction_statistics)
735    }
736
737    fn clear(&mut self) {
738        if !self.last_key.is_empty() {
739            self.last_key = FullKey::default();
740        }
741        self.last_key_is_delete = false;
742    }
743
744    fn reset_watermark(&mut self) {
745        self.pk_prefix_skip_watermark_state.reset_watermark();
746        self.non_pk_prefix_skip_watermark_state.reset_watermark();
747        self.value_skip_watermark_state.reset_watermark();
748    }
749
750    #[inline(always)]
751    fn should_skip_block(&self, table_id: TableId) -> bool {
752        !self.read_table_ids.contains(&table_id)
753    }
754
755    #[inline(always)]
756    fn may_report_process_key(&mut self, key_count: u32) {
757        const PROGRESS_KEY_INTERVAL: u32 = 100;
758        self.progress_key_num += key_count;
759        if self.progress_key_num > PROGRESS_KEY_INTERVAL {
760            self.task_progress
761                .inc_progress_key(self.progress_key_num as u64);
762            self.progress_key_num = 0;
763        }
764    }
765
766    pub async fn run(
767        &mut self,
768        iter: &mut BlockIterator,
769        target_key: FullKey<&[u8]>,
770    ) -> HummockResult<()> {
771        if self.should_skip_block(iter.table_id()) {
772            iter.finish_block();
773            return Ok(());
774        }
775
776        while iter.is_valid() && iter.key().le(&target_key) {
777            let is_new_user_key =
778                !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
779            self.compaction_statistics.iter_total_key_counts += 1;
780            self.may_report_process_key(1);
781
782            let mut drop = false;
783            let value = HummockValue::from_slice(iter.value()).unwrap();
784            let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
785            if is_first_or_new_user_key {
786                self.last_key.set(iter.key());
787                self.last_key_is_delete = false;
788            }
789
790            // See note in `compactor_runner.rs`.
791            if !self.task_config.retain_multiple_version
792                && self.task_config.gc_delete_keys
793                && value.is_delete()
794            {
795                drop = true;
796                self.last_key_is_delete = true;
797            } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
798                drop = true;
799            }
800
801            if !drop && self.compaction_filter.should_delete(iter.key()) {
802                drop = true;
803            }
804
805            if !drop && self.watermark_should_delete(&iter.key(), value) {
806                drop = true;
807                self.last_key_is_delete = true;
808            }
809
810            if self.last_table_id != Some(self.last_key.user_key.table_id) {
811                if let Some(last_table_id) = self.last_table_id.take() {
812                    self.compaction_statistics
813                        .delta_drop_stat
814                        .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
815                }
816                self.last_table_id = Some(self.last_key.user_key.table_id);
817            }
818
819            if drop {
820                self.compaction_statistics.iter_drop_key_counts += 1;
821
822                self.last_table_stats.total_key_count -= 1;
823                self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
824                self.last_table_stats.total_value_size -= value.encoded_len() as i64;
825                iter.next();
826                continue;
827            }
828            self.builder
829                .add_full_key(iter.key(), value, is_new_user_key)
830                .await?;
831            iter.next();
832        }
833        Ok(())
834    }
835
836    pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
837        if self.should_skip_block(smallest_key.user_key.table_id) {
838            // If the table id of smallest key is not in read_table_ids, we can not copy the raw block.
839            return false;
840        }
841
842        if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
843            // If the last key is delete tombstone, we can not append the origin block
844            // because it would cause a deleted key could be see by user again.
845            return false;
846        }
847
848        if self.watermark_may_delete(smallest_key) {
849            return false;
850        }
851
852        // Check compaction filter
853        if self.compaction_filter.should_delete(*smallest_key) {
854            return false;
855        }
856
857        true
858    }
859
860    fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
861        // Correctness requires the assumption that these PkPrefixSkipWatermarkState and NonPkPrefixSkipWatermarkState never use the `unused_put`.
862        let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
863        let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
864        if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
865            let unused = vec![];
866            let unused_put = HummockValue::Put(unused.as_slice());
867            if (pk_prefix_has_watermark
868                && self
869                    .pk_prefix_skip_watermark_state
870                    .should_delete(key, unused_put))
871                || (non_pk_prefix_has_watermark
872                    && self
873                        .non_pk_prefix_skip_watermark_state
874                        .should_delete(key, unused_put))
875            {
876                return true;
877            }
878        }
879        self.value_skip_watermark_state.has_watermark()
880            && self.value_skip_watermark_state.may_delete(key)
881    }
882
883    fn watermark_should_delete(
884        &mut self,
885        key: &FullKey<&[u8]>,
886        value: HummockValue<&[u8]>,
887    ) -> bool {
888        (self.pk_prefix_skip_watermark_state.has_watermark()
889            && self
890                .pk_prefix_skip_watermark_state
891                .should_delete(key, value))
892            || (self.non_pk_prefix_skip_watermark_state.has_watermark()
893                && self
894                    .non_pk_prefix_skip_watermark_state
895                    .should_delete(key, value))
896            || (self.value_skip_watermark_state.has_watermark()
897                && self.value_skip_watermark_state.should_delete(key, value))
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use std::collections::{HashMap, VecDeque};
904    use std::sync::Arc;
905
906    use risingwave_common::catalog::TableId;
907    use risingwave_common::hash::VirtualNode;
908    use risingwave_common::util::epoch::test_epoch;
909    use risingwave_hummock_sdk::compact_task::CompactTask;
910    use risingwave_hummock_sdk::key::FullKey;
911    use risingwave_hummock_sdk::level::InputLevel;
912    use risingwave_pb::hummock::compact_task::TaskType;
913    use risingwave_pb::hummock::{BloomFilterType, LevelType, PbSstableFilterType};
914
915    use super::CompactorRunner;
916    use crate::compaction_catalog_manager::CompactionCatalogAgent;
917    use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
918    use crate::hummock::compactor::task_progress::TaskProgress;
919    use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
920    use crate::hummock::iterator::test_utils::mock_sstable_store;
921    use crate::hummock::test_utils::{
922        default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
923    };
924    use crate::hummock::value::HummockValue;
925    use crate::hummock::{
926        BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
927    };
928    use crate::monitor::CompactorMetrics;
929
930    fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
931        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
932        table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
933        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
934    }
935
936    #[tokio::test]
937    async fn test_fast_compact_skips_empty_table_id_sst() {
938        let sstable_store = mock_sstable_store().await;
939        let table_id_to_vnode = HashMap::from([
940            (1, VirtualNode::COUNT_FOR_TEST),
941            (2, VirtualNode::COUNT_FOR_TEST),
942        ]);
943        let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
944
945        let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
946            default_builder_opt_for_test(),
947            1,
948            (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
949            sstable_store.clone(),
950            CachePolicy::NotFill,
951            table_id_to_vnode.clone(),
952            table_id_to_watermark_serde.clone(),
953        )
954        .await;
955        assert_eq!(dropped_only_sst.bloom_filter_kind, BloomFilterType::Sstable);
956        let mut inner = dropped_only_sst.get_inner();
957        inner.table_ids.clear();
958        dropped_only_sst.set_inner(inner);
959
960        let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
961            default_builder_opt_for_test(),
962            2,
963            (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
964            sstable_store.clone(),
965            CachePolicy::NotFill,
966            table_id_to_vnode.clone(),
967            table_id_to_watermark_serde.clone(),
968        )
969        .await;
970        let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
971            default_builder_opt_for_test(),
972            3,
973            (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
974            sstable_store.clone(),
975            CachePolicy::NotFill,
976            table_id_to_vnode,
977            table_id_to_watermark_serde,
978        )
979        .await;
980
981        let mut storage_opts = default_opts_for_test();
982        storage_opts.enable_fast_compaction = true;
983        storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
984        storage_opts.compactor_fast_max_compact_delete_ratio = 100;
985        let context = CompactorContext::new_local_compact_context(
986            Arc::new(storage_opts),
987            sstable_store,
988            Arc::new(CompactorMetrics::unused()),
989            None,
990        );
991
992        let task = CompactTask {
993            input_ssts: vec![
994                InputLevel {
995                    level_idx: 1,
996                    level_type: LevelType::Nonoverlapping,
997                    table_infos: vec![dropped_only_sst, live_left_sst],
998                },
999                InputLevel {
1000                    level_idx: 2,
1001                    level_type: LevelType::Nonoverlapping,
1002                    table_infos: vec![live_right_sst],
1003                },
1004            ],
1005            task_id: 42,
1006            target_level: 2,
1007            existing_table_ids: vec![TableId::new(2)],
1008            target_file_size: 1 << 20,
1009            task_type: TaskType::Dynamic,
1010            blocked_xor_filter_kv_count_threshold: Some(0),
1011            sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
1012            ..Default::default()
1013        };
1014
1015        assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
1016        assert!(optimize_by_copy_block(&task, &context));
1017
1018        let runner = CompactorRunner::new(
1019            context,
1020            task,
1021            CompactionCatalogAgent::for_test(vec![1, 2]),
1022            SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1023            Arc::new(TaskProgress::default()),
1024            MultiCompactionFilter::default(),
1025        );
1026        runner.run().await.unwrap();
1027    }
1028}