1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39 RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43 ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51 TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55pub struct BlockStreamIterator {
57 block_stream: Option<BlockDataStream>,
59
60 next_block_index: usize,
61
62 sstable: TableHolder,
64 iter: Option<BlockIterator>,
65 task_progress: Arc<TaskProgress>,
66
67 sstable_store: SstableStoreRef,
69 sstable_info: SstableInfo,
70 io_retry_times: usize,
71 max_io_retry_times: usize,
72 stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76 pub fn new(
91 sstable: TableHolder,
92 task_progress: Arc<TaskProgress>,
93 sstable_store: SstableStoreRef,
94 sstable_info: SstableInfo,
95 max_io_retry_times: usize,
96 stats_ptr: Arc<AtomicU64>,
97 ) -> Self {
98 Self {
99 block_stream: None,
100 next_block_index: 0,
101 sstable,
102 iter: None,
103 task_progress,
104 sstable_store,
105 sstable_info,
106 io_retry_times: 0,
107 max_io_retry_times,
108 stats_ptr,
109 }
110 }
111
112 async fn create_stream(&mut self) -> HummockResult<()> {
113 let block_stream = self
116 .sstable_store
117 .get_stream_for_blocks(
118 self.sstable_info.object_id,
119 &self.sstable.meta.block_metas[self.next_block_index..],
120 )
121 .instrument_await("stream_iter_get_stream".verbose())
122 .await?;
123 self.block_stream = Some(block_stream);
124 Ok(())
125 }
126
127 pub(crate) async fn download_next_block(
129 &mut self,
130 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131 let now = Instant::now();
132 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135 });
136 loop {
137 let ret = match &mut self.block_stream {
138 Some(block_stream) => block_stream.next_block_impl().await,
139 None => {
140 self.create_stream().await?;
141 continue;
142 }
143 };
144 match ret {
145 Ok(Some((data, _))) => {
146 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147 let filter_block = self
148 .sstable
149 .filter_reader
150 .get_block_raw_filter(self.next_block_index);
151 self.next_block_index += 1;
152 return Ok(Some((data, filter_block, meta)));
153 }
154
155 Ok(None) => break,
156
157 Err(e) => {
158 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159 return Err(e);
160 }
161
162 self.block_stream.take();
163 self.io_retry_times += 1;
164 fail_point!("create_stream_err");
165
166 tracing::warn!(
167 "fast compact retry create stream for sstable {} times, sstinfo={}",
168 self.io_retry_times,
169 format!(
170 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171 self.sstable_info.object_id,
172 self.sstable_info.sst_id,
173 self.sstable_info.meta_offset,
174 self.sstable_info.table_ids
175 )
176 );
177 }
178 }
179 }
180
181 self.next_block_index = self.sstable.meta.block_metas.len();
182 self.iter.take();
183 Ok(None)
184 }
185
186 pub(crate) fn init_block_iter(
187 &mut self,
188 buf: Bytes,
189 uncompressed_capacity: usize,
190 ) -> HummockResult<()> {
191 let block = Block::decode(buf, uncompressed_capacity)?;
192 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193 iter.seek_to_first();
194 self.iter = Some(iter);
195 Ok(())
196 }
197
198 fn next_block_smallest(&self) -> &[u8] {
199 self.sstable.meta.block_metas[self.next_block_index]
200 .smallest_key
201 .as_ref()
202 }
203
204 fn next_block_largest(&self) -> &[u8] {
205 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206 self.sstable.meta.block_metas[self.next_block_index + 1]
207 .smallest_key
208 .as_ref()
209 } else {
210 self.sstable.meta.largest_key.as_ref()
211 }
212 }
213
214 fn current_block_largest(&self) -> Vec<u8> {
215 if self.next_block_index < self.sstable.meta.block_metas.len() {
216 let mut largest_key = FullKey::decode(
217 self.sstable.meta.block_metas[self.next_block_index]
218 .smallest_key
219 .as_ref(),
220 );
221 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223 largest_key.encode()
224 } else {
225 self.sstable.meta.largest_key.clone()
226 }
227 }
228
229 fn key(&self) -> FullKey<&[u8]> {
230 match self.iter.as_ref() {
231 Some(iter) => iter.key(),
232 None => FullKey::decode(
233 self.sstable.meta.block_metas[self.next_block_index]
234 .smallest_key
235 .as_ref(),
236 ),
237 }
238 }
239
240 pub(crate) fn is_valid(&self) -> bool {
241 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242 }
243
244 #[cfg(test)]
245 #[cfg(feature = "failpoints")]
246 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247 self.iter.as_mut().unwrap()
248 }
249}
250
251impl Drop for BlockStreamIterator {
252 fn drop(&mut self) {
253 self.task_progress.dec_num_pending_read_io();
254 }
255}
256
257pub struct ConcatSstableIterator {
260 sstable_iter: Option<BlockStreamIterator>,
262
263 cur_idx: usize,
265
266 sstables: Vec<SstableInfo>,
268
269 sstable_store: SstableStoreRef,
270
271 stats: StoreLocalStatistic,
272 task_progress: Arc<TaskProgress>,
273
274 max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278 pub fn new(
282 sst_infos: Vec<SstableInfo>,
283 sstable_store: SstableStoreRef,
284 task_progress: Arc<TaskProgress>,
285 max_io_retry_times: usize,
286 ) -> Self {
287 Self {
288 sstable_iter: None,
289 cur_idx: 0,
290 sstables: sst_infos,
291 sstable_store,
292 task_progress,
293 stats: StoreLocalStatistic::default(),
294 max_io_retry_times,
295 }
296 }
297
298 pub async fn rewind(&mut self) -> HummockResult<()> {
299 self.seek_idx(0).await
300 }
301
302 pub async fn next_sstable(&mut self) -> HummockResult<()> {
303 self.seek_idx(self.cur_idx + 1).await
304 }
305
306 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307 self.sstable_iter.as_mut().unwrap()
308 }
309
310 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311 if let Some(sstable) = self.sstable_iter.as_mut() {
312 if sstable.iter.is_some() {
313 return Ok(());
314 }
315 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317 }
318 Ok(())
319 }
320
321 pub fn is_valid(&self) -> bool {
322 self.cur_idx < self.sstables.len()
323 }
324
325 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327 self.sstable_iter.take();
328 self.cur_idx = idx;
329 if self.cur_idx < self.sstables.len() {
330 let sstable_info = &self.sstables[self.cur_idx];
331 let sstable = self
332 .sstable_store
333 .sstable(sstable_info, &mut self.stats)
334 .instrument_await("stream_iter_sstable".verbose())
335 .await?;
336 self.task_progress.inc_num_pending_read_io();
337
338 let sstable_iter = BlockStreamIterator::new(
339 sstable,
340 self.task_progress.clone(),
341 self.sstable_store.clone(),
342 sstable_info.clone(),
343 self.max_io_retry_times,
344 self.stats.remote_io_time.clone(),
345 );
346 self.sstable_iter = Some(sstable_iter);
347 }
348 Ok(())
349 }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353 left: Box<ConcatSstableIterator>,
354 right: Box<ConcatSstableIterator>,
355 task_id: u64,
356 executor: CompactTaskExecutor<
357 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358 C,
359 >,
360 compression_algorithm: CompressionAlgorithm,
361 metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365 pub fn new(
366 context: CompactorContext,
367 task: CompactTask,
368 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369 object_id_getter: Arc<dyn GetObjectId>,
370 task_progress: Arc<TaskProgress>,
371 compaction_filter: C,
372 ) -> Self {
373 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375 options.compression_algorithm = compression_algorithm;
376 options.capacity = task.target_file_size as usize;
377 options.max_vnode_key_range_bytes = None;
379 let get_id_time = Arc::new(AtomicU64::new(0));
380
381 let key_range = KeyRange::inf();
382
383 let task_config = TaskConfig {
384 key_range,
385 cache_policy: CachePolicy::NotFill,
386 gc_delete_keys: task.gc_delete_keys,
387 retain_multiple_version: false,
388 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
389 table_vnode_partition: task.table_vnode_partition.clone(),
390 use_block_based_filter: true,
391 table_schemas: Default::default(),
392 disable_drop_column_optimization: false,
393 };
394 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397 object_id_getter,
398 limiter: context.memory_limiter.clone(),
399 options,
400 policy: task_config.cache_policy,
401 remote_rpc_cost: get_id_time,
402 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403 sstable_writer_factory: factory,
404 _phantom: PhantomData,
405 };
406 let sst_builder = CapacitySplitTableBuilder::new(
407 builder_factory,
408 context.compactor_metrics.clone(),
409 Some(task_progress.clone()),
410 task_config.table_vnode_partition.clone(),
411 context
412 .storage_opts
413 .compactor_concurrent_uploading_sst_count,
414 compaction_catalog_agent_ref.clone(),
415 );
416 assert_eq!(
417 task.input_ssts.len(),
418 2,
419 "TaskId {} target_level {:?} task {:?}",
420 task.task_id,
421 task.target_level,
422 compact_task_to_string(&task)
423 );
424 let left = Box::new(ConcatSstableIterator::new(
425 task.input_ssts[0].table_infos.clone(),
426 context.sstable_store.clone(),
427 task_progress.clone(),
428 context.storage_opts.compactor_iter_max_io_retry_times,
429 ));
430 let right = Box::new(ConcatSstableIterator::new(
431 task.input_ssts[1].table_infos.clone(),
432 context.sstable_store,
433 task_progress.clone(),
434 context.storage_opts.compactor_iter_max_io_retry_times,
435 ));
436
437 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439 task.pk_prefix_table_watermarks.clone(),
440 );
441 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442 task.non_pk_prefix_table_watermarks.clone(),
443 compaction_catalog_agent_ref.clone(),
444 );
445 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
446 task.value_table_watermarks.clone(),
447 compaction_catalog_agent_ref,
448 );
449
450 let compact_table_ids = HashSet::from_iter(task.build_compact_table_ids());
451
452 Self {
453 executor: CompactTaskExecutor::new(
454 sst_builder,
455 task_config,
456 task_progress,
457 pk_prefix_state,
458 non_pk_prefix_state,
459 value_skip_watermark_state,
460 compaction_filter,
461 compact_table_ids,
462 ),
463 left,
464 right,
465 task_id: task.task_id,
466 metrics: context.compactor_metrics,
467 compression_algorithm,
468 }
469 }
470
471 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
472 self.left.rewind().await?;
473 self.right.rewind().await?;
474 let mut skip_raw_block_count = 0;
475 let mut skip_raw_block_size = 0;
476 while self.left.is_valid() && self.right.is_valid() {
477 let ret = self
478 .left
479 .current_sstable()
480 .key()
481 .cmp(&self.right.current_sstable().key());
482 let (first, second) = if ret == Ordering::Less {
483 (&mut self.left, &mut self.right)
484 } else {
485 (&mut self.right, &mut self.left)
486 };
487 assert!(
488 ret != Ordering::Equal,
489 "sst range overlap equal_key {:?}",
490 self.left.current_sstable().key()
491 );
492 if first.current_sstable().iter.is_none() {
493 let right_key = second.current_sstable().key();
494 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
495 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
496 if full_key.user_key.ge(&right_key.user_key) {
499 break;
500 }
501 let smallest_key =
502 FullKey::decode(first.current_sstable().next_block_smallest());
503 if !self.executor.shall_copy_raw_block(&smallest_key) {
504 break;
505 }
506 let smallest_key = smallest_key.to_vec();
507
508 let (mut block, filter_data, mut meta) = first
509 .current_sstable()
510 .download_next_block()
511 .await?
512 .unwrap();
513 let algorithm = Block::get_algorithm(&block)?;
514 if algorithm == CompressionAlgorithm::None
515 && algorithm != self.compression_algorithm
516 {
517 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
518 meta.len = block.len() as u32;
519 }
520
521 let largest_key = first.current_sstable().current_block_largest();
522 let block_len = block.len() as u64;
523 let block_key_count = meta.total_key_count;
524
525 if self
526 .executor
527 .builder
528 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
529 .await?
530 {
531 skip_raw_block_size += block_len;
532 skip_raw_block_count += 1;
533 }
534 self.executor.may_report_process_key(block_key_count);
535 self.executor.clear();
536 }
537 if !first.current_sstable().is_valid() {
538 first.next_sstable().await?;
539 continue;
540 }
541 first.init_block_iter().await?;
542 }
543
544 let target_key = second.current_sstable().key();
545 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
546 self.executor.reset_watermark();
547 self.executor.run(iter, target_key).await?;
548 if !iter.is_valid() {
549 first.sstable_iter.as_mut().unwrap().iter.take();
550 if !first.current_sstable().is_valid() {
551 first.next_sstable().await?;
552 }
553 }
554 }
555 let rest_data = if !self.left.is_valid() {
556 &mut self.right
557 } else {
558 &mut self.left
559 };
560 if rest_data.is_valid() {
561 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
563 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
564 if let Some(iter) = sstable_iter.iter.as_mut() {
565 self.executor.reset_watermark();
566 self.executor.run(iter, target_key).await?;
567 assert!(
568 !iter.is_valid(),
569 "iter should not be valid key {:?}",
570 iter.key()
571 );
572 }
573 sstable_iter.iter.take();
574 }
575
576 while rest_data.is_valid() {
577 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
578 while sstable_iter.is_valid() {
579 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
580 let (block, filter_data, block_meta) =
581 sstable_iter.download_next_block().await?.unwrap();
582 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
584 && self.executor.last_key_is_delete;
585 if self.executor.builder.need_flush()
586 || need_deleted
587 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
588 {
589 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
590 let target_key = FullKey::decode(&largest_key);
591 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
592 let mut iter = sstable_iter.iter.take().unwrap();
593 self.executor.reset_watermark();
594 self.executor.run(&mut iter, target_key).await?;
595 } else {
596 let largest_key = sstable_iter.current_block_largest();
597 let block_len = block.len() as u64;
598 let block_key_count = block_meta.total_key_count;
599 if self
600 .executor
601 .builder
602 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
603 .await?
604 {
605 skip_raw_block_count += 1;
606 skip_raw_block_size += block_len;
607 }
608 self.executor.may_report_process_key(block_key_count);
609 self.executor.clear();
610 }
611 }
612 rest_data.next_sstable().await?;
613 }
614 let mut total_read_bytes = 0;
615 for sst in &self.left.sstables {
616 total_read_bytes += sst.sst_size;
617 }
618 for sst in &self.right.sstables {
619 total_read_bytes += sst.sst_size;
620 }
621 self.metrics
622 .compact_fast_runner_bytes
623 .inc_by(skip_raw_block_size);
624 tracing::info!(
625 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
626 skip_raw_block_count,
627 self.task_id,
628 skip_raw_block_size * 100 / total_read_bytes,
629 );
630
631 let statistic = self.executor.take_statistics();
632 let output_ssts = self.executor.builder.finish().await?;
633 Compactor::report_progress(
634 self.metrics.clone(),
635 Some(self.executor.task_progress.clone()),
636 &output_ssts,
637 false,
638 );
639 let sst_infos = output_ssts
640 .iter()
641 .map(|sst| sst.sst_info.clone())
642 .collect_vec();
643 assert!(can_concat(&sst_infos));
644 Ok((output_ssts, statistic))
645 }
646}
647
648pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
649 last_key: FullKey<Vec<u8>>,
650 compaction_statistics: CompactionStatistics,
651 last_table_id: Option<TableId>,
652 last_table_stats: TableStats,
653 builder: CapacitySplitTableBuilder<F>,
654 task_config: TaskConfig,
655 task_progress: Arc<TaskProgress>,
656 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
657 last_key_is_delete: bool,
658 progress_key_num: u32,
659 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
660 value_skip_watermark_state: ValueSkipWatermarkState,
661 compaction_filter: C,
662 compact_table_ids: HashSet<TableId>,
663}
664
665impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
666 pub fn new(
667 builder: CapacitySplitTableBuilder<F>,
668 task_config: TaskConfig,
669 task_progress: Arc<TaskProgress>,
670 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
671 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
672 value_skip_watermark_state: ValueSkipWatermarkState,
673 compaction_filter: C,
674 compact_table_ids: HashSet<TableId>,
675 ) -> Self {
676 Self {
677 builder,
678 task_config,
679 last_key: FullKey::default(),
680 last_key_is_delete: false,
681 compaction_statistics: CompactionStatistics::default(),
682 last_table_id: None,
683 last_table_stats: TableStats::default(),
684 task_progress,
685 pk_prefix_skip_watermark_state,
686 progress_key_num: 0,
687 non_pk_prefix_skip_watermark_state,
688 value_skip_watermark_state,
689 compaction_filter,
690 compact_table_ids,
691 }
692 }
693
694 fn take_statistics(&mut self) -> CompactionStatistics {
695 if let Some(last_table_id) = self.last_table_id.take() {
696 self.compaction_statistics
697 .delta_drop_stat
698 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
699 }
700 std::mem::take(&mut self.compaction_statistics)
701 }
702
703 fn clear(&mut self) {
704 if !self.last_key.is_empty() {
705 self.last_key = FullKey::default();
706 }
707 self.last_key_is_delete = false;
708 }
709
710 fn reset_watermark(&mut self) {
711 self.pk_prefix_skip_watermark_state.reset_watermark();
712 self.non_pk_prefix_skip_watermark_state.reset_watermark();
713 self.value_skip_watermark_state.reset_watermark();
714 }
715
716 #[inline(always)]
717 fn should_skip_block(&self, table_id: TableId) -> bool {
718 !self.compact_table_ids.contains(&table_id)
719 }
720
721 #[inline(always)]
722 fn may_report_process_key(&mut self, key_count: u32) {
723 const PROGRESS_KEY_INTERVAL: u32 = 100;
724 self.progress_key_num += key_count;
725 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
726 self.task_progress
727 .inc_progress_key(self.progress_key_num as u64);
728 self.progress_key_num = 0;
729 }
730 }
731
732 pub async fn run(
733 &mut self,
734 iter: &mut BlockIterator,
735 target_key: FullKey<&[u8]>,
736 ) -> HummockResult<()> {
737 if self.should_skip_block(iter.table_id()) {
738 iter.finish_block();
739 return Ok(());
740 }
741
742 while iter.is_valid() && iter.key().le(&target_key) {
743 let is_new_user_key =
744 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
745 self.compaction_statistics.iter_total_key_counts += 1;
746 self.may_report_process_key(1);
747
748 let mut drop = false;
749 let value = HummockValue::from_slice(iter.value()).unwrap();
750 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
751 if is_first_or_new_user_key {
752 self.last_key.set(iter.key());
753 self.last_key_is_delete = false;
754 }
755
756 if !self.task_config.retain_multiple_version
758 && self.task_config.gc_delete_keys
759 && value.is_delete()
760 {
761 drop = true;
762 self.last_key_is_delete = true;
763 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
764 drop = true;
765 }
766
767 if !drop && self.compaction_filter.should_delete(iter.key()) {
768 drop = true;
769 }
770
771 if !drop && self.watermark_should_delete(&iter.key(), value) {
772 drop = true;
773 self.last_key_is_delete = true;
774 }
775
776 if self.last_table_id != Some(self.last_key.user_key.table_id) {
777 if let Some(last_table_id) = self.last_table_id.take() {
778 self.compaction_statistics
779 .delta_drop_stat
780 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
781 }
782 self.last_table_id = Some(self.last_key.user_key.table_id);
783 }
784
785 if drop {
786 self.compaction_statistics.iter_drop_key_counts += 1;
787
788 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
789 Some(target_table_ids) => {
790 target_table_ids.contains(&self.last_key.user_key.table_id)
791 }
792 None => true,
793 };
794 if should_count {
795 self.last_table_stats.total_key_count -= 1;
796 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
797 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
798 }
799 iter.next();
800 continue;
801 }
802 self.builder
803 .add_full_key(iter.key(), value, is_new_user_key)
804 .await?;
805 iter.next();
806 }
807 Ok(())
808 }
809
810 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
811 if self.should_skip_block(smallest_key.user_key.table_id) {
812 return false;
814 }
815
816 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
817 return false;
820 }
821
822 if self.watermark_may_delete(smallest_key) {
823 return false;
824 }
825
826 if self.compaction_filter.should_delete(*smallest_key) {
828 return false;
829 }
830
831 true
832 }
833
834 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
835 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
837 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
838 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
839 let unused = vec![];
840 let unused_put = HummockValue::Put(unused.as_slice());
841 if (pk_prefix_has_watermark
842 && self
843 .pk_prefix_skip_watermark_state
844 .should_delete(key, unused_put))
845 || (non_pk_prefix_has_watermark
846 && self
847 .non_pk_prefix_skip_watermark_state
848 .should_delete(key, unused_put))
849 {
850 return true;
851 }
852 }
853 self.value_skip_watermark_state.has_watermark()
854 && self.value_skip_watermark_state.may_delete(key)
855 }
856
857 fn watermark_should_delete(
858 &mut self,
859 key: &FullKey<&[u8]>,
860 value: HummockValue<&[u8]>,
861 ) -> bool {
862 (self.pk_prefix_skip_watermark_state.has_watermark()
863 && self
864 .pk_prefix_skip_watermark_state
865 .should_delete(key, value))
866 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
867 && self
868 .non_pk_prefix_skip_watermark_state
869 .should_delete(key, value))
870 || (self.value_skip_watermark_state.has_watermark()
871 && self.value_skip_watermark_state.should_delete(key, value))
872 }
873}