1use std::cmp::Ordering;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::table_watermark::TableWatermarks;
33use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::task_progress::TaskProgress;
38use crate::hummock::compactor::{
39 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
40 RemoteBuilderFactory, TaskConfig,
41};
42use crate::hummock::iterator::{
43 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
44 ValueSkipWatermarkState,
45};
46use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
47use crate::hummock::sstable_store::SstableStoreRef;
48use crate::hummock::value::HummockValue;
49use crate::hummock::{
50 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
51 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
52 TableHolder, UnifiedSstableWriterFactory,
53};
54use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
55
56pub struct BlockStreamIterator {
58 block_stream: Option<BlockDataStream>,
60
61 next_block_index: usize,
62
63 sstable: TableHolder,
65 iter: Option<BlockIterator>,
66 task_progress: Arc<TaskProgress>,
67
68 sstable_store: SstableStoreRef,
70 sstable_info: SstableInfo,
71 io_retry_times: usize,
72 max_io_retry_times: usize,
73 stats_ptr: Arc<AtomicU64>,
74}
75
76impl BlockStreamIterator {
77 pub fn new(
92 sstable: TableHolder,
93 task_progress: Arc<TaskProgress>,
94 sstable_store: SstableStoreRef,
95 sstable_info: SstableInfo,
96 max_io_retry_times: usize,
97 stats_ptr: Arc<AtomicU64>,
98 ) -> Self {
99 Self {
100 block_stream: None,
101 next_block_index: 0,
102 sstable,
103 iter: None,
104 task_progress,
105 sstable_store,
106 sstable_info,
107 io_retry_times: 0,
108 max_io_retry_times,
109 stats_ptr,
110 }
111 }
112
113 async fn create_stream(&mut self) -> HummockResult<()> {
114 let block_stream = self
117 .sstable_store
118 .get_stream_for_blocks(
119 self.sstable_info.object_id,
120 &self.sstable.meta.block_metas[self.next_block_index..],
121 )
122 .instrument_await("stream_iter_get_stream".verbose())
123 .await?;
124 self.block_stream = Some(block_stream);
125 Ok(())
126 }
127
128 pub(crate) async fn download_next_block(
130 &mut self,
131 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
132 let now = Instant::now();
133 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
134 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
135 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
136 });
137 loop {
138 let ret = match &mut self.block_stream {
139 Some(block_stream) => block_stream.next_block_impl().await,
140 None => {
141 self.create_stream().await?;
142 continue;
143 }
144 };
145 match ret {
146 Ok(Some((data, _))) => {
147 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
148 let filter_block = self
149 .sstable
150 .filter_reader
151 .get_block_raw_filter(self.next_block_index);
152 self.next_block_index += 1;
153 return Ok(Some((data, filter_block, meta)));
154 }
155
156 Ok(None) => break,
157
158 Err(e) => {
159 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
160 return Err(e);
161 }
162
163 self.block_stream.take();
164 self.io_retry_times += 1;
165 fail_point!("create_stream_err");
166
167 tracing::warn!(
168 "fast compact retry create stream for sstable {} times, sstinfo={}",
169 self.io_retry_times,
170 format!(
171 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
172 self.sstable_info.object_id,
173 self.sstable_info.sst_id,
174 self.sstable_info.meta_offset,
175 self.sstable_info.table_ids
176 )
177 );
178 }
179 }
180 }
181
182 self.next_block_index = self.sstable.meta.block_metas.len();
183 self.iter.take();
184 Ok(None)
185 }
186
187 pub(crate) fn init_block_iter(
188 &mut self,
189 buf: Bytes,
190 uncompressed_capacity: usize,
191 ) -> HummockResult<()> {
192 let block = Block::decode(buf, uncompressed_capacity)?;
193 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
194 iter.seek_to_first();
195 self.iter = Some(iter);
196 Ok(())
197 }
198
199 fn next_block_smallest(&self) -> &[u8] {
200 self.sstable.meta.block_metas[self.next_block_index]
201 .smallest_key
202 .as_ref()
203 }
204
205 fn next_block_largest(&self) -> &[u8] {
206 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
207 self.sstable.meta.block_metas[self.next_block_index + 1]
208 .smallest_key
209 .as_ref()
210 } else {
211 self.sstable.meta.largest_key.as_ref()
212 }
213 }
214
215 fn current_block_largest(&self) -> Vec<u8> {
216 if self.next_block_index < self.sstable.meta.block_metas.len() {
217 let mut largest_key = FullKey::decode(
218 self.sstable.meta.block_metas[self.next_block_index]
219 .smallest_key
220 .as_ref(),
221 );
222 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
224 largest_key.encode()
225 } else {
226 self.sstable.meta.largest_key.clone()
227 }
228 }
229
230 fn key(&self) -> FullKey<&[u8]> {
231 match self.iter.as_ref() {
232 Some(iter) => iter.key(),
233 None => FullKey::decode(
234 self.sstable.meta.block_metas[self.next_block_index]
235 .smallest_key
236 .as_ref(),
237 ),
238 }
239 }
240
241 pub(crate) fn is_valid(&self) -> bool {
242 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
243 }
244
245 #[cfg(test)]
246 #[cfg(feature = "failpoints")]
247 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
248 self.iter.as_mut().unwrap()
249 }
250}
251
252impl Drop for BlockStreamIterator {
253 fn drop(&mut self) {
254 self.task_progress.dec_num_pending_read_io();
255 }
256}
257
258pub struct ConcatSstableIterator {
261 sstable_iter: Option<BlockStreamIterator>,
263
264 cur_idx: usize,
266
267 sstables: Vec<SstableInfo>,
269
270 sstable_store: SstableStoreRef,
271
272 stats: StoreLocalStatistic,
273 task_progress: Arc<TaskProgress>,
274
275 max_io_retry_times: usize,
276}
277
278impl ConcatSstableIterator {
279 pub fn new(
283 sst_infos: Vec<SstableInfo>,
284 sstable_store: SstableStoreRef,
285 task_progress: Arc<TaskProgress>,
286 max_io_retry_times: usize,
287 ) -> Self {
288 Self {
289 sstable_iter: None,
290 cur_idx: 0,
291 sstables: sst_infos,
292 sstable_store,
293 task_progress,
294 stats: StoreLocalStatistic::default(),
295 max_io_retry_times,
296 }
297 }
298
299 pub async fn rewind(&mut self) -> HummockResult<()> {
300 self.seek_idx(0).await
301 }
302
303 pub async fn next_sstable(&mut self) -> HummockResult<()> {
304 self.seek_idx(self.cur_idx + 1).await
305 }
306
307 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
308 self.sstable_iter.as_mut().unwrap()
309 }
310
311 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
312 if let Some(sstable) = self.sstable_iter.as_mut() {
313 if sstable.iter.is_some() {
314 return Ok(());
315 }
316 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
317 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
318 }
319 Ok(())
320 }
321
322 pub fn is_valid(&self) -> bool {
323 self.cur_idx < self.sstables.len()
324 }
325
326 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
328 self.sstable_iter.take();
329 self.cur_idx = idx;
330 if self.cur_idx < self.sstables.len() {
331 let sstable_info = &self.sstables[self.cur_idx];
332 let sstable = self
333 .sstable_store
334 .sstable(sstable_info, &mut self.stats)
335 .instrument_await("stream_iter_sstable".verbose())
336 .await?;
337 self.task_progress.inc_num_pending_read_io();
338
339 let sstable_iter = BlockStreamIterator::new(
340 sstable,
341 self.task_progress.clone(),
342 self.sstable_store.clone(),
343 sstable_info.clone(),
344 self.max_io_retry_times,
345 self.stats.remote_io_time.clone(),
346 );
347 self.sstable_iter = Some(sstable_iter);
348 }
349 Ok(())
350 }
351}
352
353pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
354 left: Box<ConcatSstableIterator>,
355 right: Box<ConcatSstableIterator>,
356 task_id: u64,
357 executor: CompactTaskExecutor<
358 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
359 C,
360 >,
361 compression_algorithm: CompressionAlgorithm,
362 metrics: Arc<CompactorMetrics>,
363}
364
365impl<C: CompactionFilter> CompactorRunner<C> {
366 pub fn new(
367 context: CompactorContext,
368 task: CompactTask,
369 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
370 object_id_getter: Arc<dyn GetObjectId>,
371 task_progress: Arc<TaskProgress>,
372 compaction_filter: C,
373 ) -> Self {
374 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
375 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
376 options.compression_algorithm = compression_algorithm;
377 options.capacity = task.target_file_size as usize;
378 let get_id_time = Arc::new(AtomicU64::new(0));
379
380 let key_range = KeyRange::inf();
381
382 let task_config = TaskConfig {
383 key_range,
384 cache_policy: CachePolicy::NotFill,
385 gc_delete_keys: task.gc_delete_keys,
386 retain_multiple_version: false,
387 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
388 task_type: task.task_type,
389 table_vnode_partition: task.table_vnode_partition.clone(),
390 use_block_based_filter: true,
391 table_schemas: Default::default(),
392 disable_drop_column_optimization: false,
393 };
394 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397 object_id_getter,
398 limiter: context.memory_limiter.clone(),
399 options,
400 policy: task_config.cache_policy,
401 remote_rpc_cost: get_id_time,
402 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403 sstable_writer_factory: factory,
404 _phantom: PhantomData,
405 };
406 let sst_builder = CapacitySplitTableBuilder::new(
407 builder_factory,
408 context.compactor_metrics.clone(),
409 Some(task_progress.clone()),
410 task_config.table_vnode_partition.clone(),
411 context
412 .storage_opts
413 .compactor_concurrent_uploading_sst_count,
414 compaction_catalog_agent_ref.clone(),
415 );
416 assert_eq!(
417 task.input_ssts.len(),
418 2,
419 "TaskId {} target_level {:?} task {:?}",
420 task.task_id,
421 task.target_level,
422 compact_task_to_string(&task)
423 );
424 let left = Box::new(ConcatSstableIterator::new(
425 task.input_ssts[0].table_infos.clone(),
426 context.sstable_store.clone(),
427 task_progress.clone(),
428 context.storage_opts.compactor_iter_max_io_retry_times,
429 ));
430 let right = Box::new(ConcatSstableIterator::new(
431 task.input_ssts[1].table_infos.clone(),
432 context.sstable_store,
433 task_progress.clone(),
434 context.storage_opts.compactor_iter_max_io_retry_times,
435 ));
436
437 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439 task.pk_prefix_table_watermarks.clone(),
440 );
441 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442 task.non_pk_prefix_table_watermarks.clone(),
443 compaction_catalog_agent_ref.clone(),
444 );
445 let value_table_watermarks: BTreeMap<TableId, TableWatermarks> = BTreeMap::default();
447 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
448 value_table_watermarks,
449 compaction_catalog_agent_ref,
450 );
451
452 Self {
453 executor: CompactTaskExecutor::new(
454 sst_builder,
455 task_config,
456 task_progress,
457 pk_prefix_state,
458 non_pk_prefix_state,
459 value_skip_watermark_state,
460 compaction_filter,
461 ),
462 left,
463 right,
464 task_id: task.task_id,
465 metrics: context.compactor_metrics,
466 compression_algorithm,
467 }
468 }
469
470 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
471 self.left.rewind().await?;
472 self.right.rewind().await?;
473 let mut skip_raw_block_count = 0;
474 let mut skip_raw_block_size = 0;
475 while self.left.is_valid() && self.right.is_valid() {
476 let ret = self
477 .left
478 .current_sstable()
479 .key()
480 .cmp(&self.right.current_sstable().key());
481 let (first, second) = if ret == Ordering::Less {
482 (&mut self.left, &mut self.right)
483 } else {
484 (&mut self.right, &mut self.left)
485 };
486 assert!(
487 ret != Ordering::Equal,
488 "sst range overlap equal_key {:?}",
489 self.left.current_sstable().key()
490 );
491 if first.current_sstable().iter.is_none() {
492 let right_key = second.current_sstable().key();
493 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
494 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
495 if full_key.user_key.ge(&right_key.user_key) {
498 break;
499 }
500 let smallest_key =
501 FullKey::decode(first.current_sstable().next_block_smallest());
502 if !self.executor.shall_copy_raw_block(&smallest_key) {
503 break;
504 }
505 let smallest_key = smallest_key.to_vec();
506
507 let (mut block, filter_data, mut meta) = first
508 .current_sstable()
509 .download_next_block()
510 .await?
511 .unwrap();
512 let algorithm = Block::get_algorithm(&block)?;
513 if algorithm == CompressionAlgorithm::None
514 && algorithm != self.compression_algorithm
515 {
516 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
517 meta.len = block.len() as u32;
518 }
519
520 let largest_key = first.current_sstable().current_block_largest();
521 let block_len = block.len() as u64;
522 let block_key_count = meta.total_key_count;
523
524 if self
525 .executor
526 .builder
527 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
528 .await?
529 {
530 skip_raw_block_size += block_len;
531 skip_raw_block_count += 1;
532 }
533 self.executor.may_report_process_key(block_key_count);
534 self.executor.clear();
535 }
536 if !first.current_sstable().is_valid() {
537 first.next_sstable().await?;
538 continue;
539 }
540 first.init_block_iter().await?;
541 }
542
543 let target_key = second.current_sstable().key();
544 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
545 self.executor.run(iter, target_key).await?;
546 if !iter.is_valid() {
547 first.sstable_iter.as_mut().unwrap().iter.take();
548 if !first.current_sstable().is_valid() {
549 first.next_sstable().await?;
550 }
551 }
552 }
553 let rest_data = if !self.left.is_valid() {
554 &mut self.right
555 } else {
556 &mut self.left
557 };
558 if rest_data.is_valid() {
559 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
561 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
562 if let Some(iter) = sstable_iter.iter.as_mut() {
563 self.executor.run(iter, target_key).await?;
564 assert!(
565 !iter.is_valid(),
566 "iter should not be valid key {:?}",
567 iter.key()
568 );
569 }
570 sstable_iter.iter.take();
571 }
572
573 while rest_data.is_valid() {
574 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
575 while sstable_iter.is_valid() {
576 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
577 let (block, filter_data, block_meta) =
578 sstable_iter.download_next_block().await?.unwrap();
579 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
581 && self.executor.last_key_is_delete;
582 if self.executor.builder.need_flush()
583 || need_deleted
584 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
585 {
586 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
587 let target_key = FullKey::decode(&largest_key);
588 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
589 let mut iter = sstable_iter.iter.take().unwrap();
590 self.executor.run(&mut iter, target_key).await?;
591 } else {
592 let largest_key = sstable_iter.current_block_largest();
593 let block_len = block.len() as u64;
594 let block_key_count = block_meta.total_key_count;
595 if self
596 .executor
597 .builder
598 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
599 .await?
600 {
601 skip_raw_block_count += 1;
602 skip_raw_block_size += block_len;
603 }
604 self.executor.may_report_process_key(block_key_count);
605 self.executor.clear();
606 }
607 }
608 rest_data.next_sstable().await?;
609 }
610 let mut total_read_bytes = 0;
611 for sst in &self.left.sstables {
612 total_read_bytes += sst.sst_size;
613 }
614 for sst in &self.right.sstables {
615 total_read_bytes += sst.sst_size;
616 }
617 self.metrics
618 .compact_fast_runner_bytes
619 .inc_by(skip_raw_block_size);
620 tracing::info!(
621 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
622 skip_raw_block_count,
623 self.task_id,
624 skip_raw_block_size * 100 / total_read_bytes,
625 );
626
627 let statistic = self.executor.take_statistics();
628 let output_ssts = self.executor.builder.finish().await?;
629 Compactor::report_progress(
630 self.metrics.clone(),
631 Some(self.executor.task_progress.clone()),
632 &output_ssts,
633 false,
634 );
635 let sst_infos = output_ssts
636 .iter()
637 .map(|sst| sst.sst_info.clone())
638 .collect_vec();
639 assert!(can_concat(&sst_infos));
640 Ok((output_ssts, statistic))
641 }
642}
643
644pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
645 last_key: FullKey<Vec<u8>>,
646 compaction_statistics: CompactionStatistics,
647 last_table_id: Option<TableId>,
648 last_table_stats: TableStats,
649 builder: CapacitySplitTableBuilder<F>,
650 task_config: TaskConfig,
651 task_progress: Arc<TaskProgress>,
652 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
653 last_key_is_delete: bool,
654 progress_key_num: u32,
655 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
656 value_skip_watermark_state: ValueSkipWatermarkState,
657 compaction_filter: C,
658}
659
660impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
661 pub fn new(
662 builder: CapacitySplitTableBuilder<F>,
663 task_config: TaskConfig,
664 task_progress: Arc<TaskProgress>,
665 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
666 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
667 value_skip_watermark_state: ValueSkipWatermarkState,
668 compaction_filter: C,
669 ) -> Self {
670 Self {
671 builder,
672 task_config,
673 last_key: FullKey::default(),
674 last_key_is_delete: false,
675 compaction_statistics: CompactionStatistics::default(),
676 last_table_id: None,
677 last_table_stats: TableStats::default(),
678 task_progress,
679 pk_prefix_skip_watermark_state,
680 progress_key_num: 0,
681 non_pk_prefix_skip_watermark_state,
682 value_skip_watermark_state,
683 compaction_filter,
684 }
685 }
686
687 fn take_statistics(&mut self) -> CompactionStatistics {
688 if let Some(last_table_id) = self.last_table_id.take() {
689 self.compaction_statistics
690 .delta_drop_stat
691 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
692 }
693 std::mem::take(&mut self.compaction_statistics)
694 }
695
696 fn clear(&mut self) {
697 if !self.last_key.is_empty() {
698 self.last_key = FullKey::default();
699 }
700 self.last_key_is_delete = false;
701 }
702
703 #[inline(always)]
704 fn may_report_process_key(&mut self, key_count: u32) {
705 const PROGRESS_KEY_INTERVAL: u32 = 100;
706 self.progress_key_num += key_count;
707 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
708 self.task_progress
709 .inc_progress_key(self.progress_key_num as u64);
710 self.progress_key_num = 0;
711 }
712 }
713
714 pub async fn run(
715 &mut self,
716 iter: &mut BlockIterator,
717 target_key: FullKey<&[u8]>,
718 ) -> HummockResult<()> {
719 self.pk_prefix_skip_watermark_state.reset_watermark();
720 self.non_pk_prefix_skip_watermark_state.reset_watermark();
721
722 while iter.is_valid() && iter.key().le(&target_key) {
723 let is_new_user_key =
724 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
725 self.compaction_statistics.iter_total_key_counts += 1;
726 self.may_report_process_key(1);
727
728 let mut drop = false;
729 let value = HummockValue::from_slice(iter.value()).unwrap();
730 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
731 if is_first_or_new_user_key {
732 self.last_key.set(iter.key());
733 self.last_key_is_delete = false;
734 }
735
736 if !self.task_config.retain_multiple_version
738 && self.task_config.gc_delete_keys
739 && value.is_delete()
740 {
741 drop = true;
742 self.last_key_is_delete = true;
743 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
744 drop = true;
745 }
746
747 if !drop && self.compaction_filter.should_delete(iter.key()) {
748 drop = true;
749 }
750
751 if !drop && self.watermark_should_delete(&iter.key(), value) {
752 drop = true;
753 self.last_key_is_delete = true;
754 }
755
756 if self.last_table_id != Some(self.last_key.user_key.table_id) {
757 if let Some(last_table_id) = self.last_table_id.take() {
758 self.compaction_statistics
759 .delta_drop_stat
760 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
761 }
762 self.last_table_id = Some(self.last_key.user_key.table_id);
763 }
764
765 if drop {
766 self.compaction_statistics.iter_drop_key_counts += 1;
767
768 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
769 Some(target_table_ids) => {
770 target_table_ids.contains(&self.last_key.user_key.table_id)
771 }
772 None => true,
773 };
774 if should_count {
775 self.last_table_stats.total_key_count -= 1;
776 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
777 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
778 }
779 iter.next();
780 continue;
781 }
782 self.builder
783 .add_full_key(iter.key(), value, is_new_user_key)
784 .await?;
785 iter.next();
786 }
787 Ok(())
788 }
789
790 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
791 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
792 return false;
795 }
796
797 if self.watermark_may_delete(smallest_key) {
798 return false;
799 }
800
801 if self.compaction_filter.should_delete(*smallest_key) {
803 return false;
804 }
805
806 true
807 }
808
809 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
810 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
812 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
813 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
814 let unused = vec![];
815 let unused_put = HummockValue::Put(unused.as_slice());
816 if (pk_prefix_has_watermark
817 && self
818 .pk_prefix_skip_watermark_state
819 .should_delete(key, unused_put))
820 || (non_pk_prefix_has_watermark
821 && self
822 .non_pk_prefix_skip_watermark_state
823 .should_delete(key, unused_put))
824 {
825 return true;
826 }
827 }
828 self.value_skip_watermark_state.has_watermark()
829 && self.value_skip_watermark_state.may_delete(key)
830 }
831
832 fn watermark_should_delete(
833 &mut self,
834 key: &FullKey<&[u8]>,
835 value: HummockValue<&[u8]>,
836 ) -> bool {
837 (self.pk_prefix_skip_watermark_state.has_watermark()
838 && self
839 .pk_prefix_skip_watermark_state
840 .should_delete(key, value))
841 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
842 && self
843 .non_pk_prefix_skip_watermark_state
844 .should_delete(key, value))
845 || (self.value_skip_watermark_state.has_watermark()
846 && self.value_skip_watermark_state.should_delete(key, value))
847 }
848}