1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39 RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43 ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51 TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55pub struct BlockStreamIterator {
57 block_stream: Option<BlockDataStream>,
59
60 next_block_index: usize,
61
62 sstable: TableHolder,
64 iter: Option<BlockIterator>,
65 task_progress: Arc<TaskProgress>,
66
67 sstable_store: SstableStoreRef,
69 sstable_info: SstableInfo,
70 io_retry_times: usize,
71 max_io_retry_times: usize,
72 stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76 pub fn new(
91 sstable: TableHolder,
92 task_progress: Arc<TaskProgress>,
93 sstable_store: SstableStoreRef,
94 sstable_info: SstableInfo,
95 max_io_retry_times: usize,
96 stats_ptr: Arc<AtomicU64>,
97 ) -> Self {
98 Self {
99 block_stream: None,
100 next_block_index: 0,
101 sstable,
102 iter: None,
103 task_progress,
104 sstable_store,
105 sstable_info,
106 io_retry_times: 0,
107 max_io_retry_times,
108 stats_ptr,
109 }
110 }
111
112 async fn create_stream(&mut self) -> HummockResult<()> {
113 let block_stream = self
116 .sstable_store
117 .get_stream_for_blocks(
118 self.sstable_info.object_id,
119 &self.sstable.meta.block_metas[self.next_block_index..],
120 )
121 .instrument_await("stream_iter_get_stream".verbose())
122 .await?;
123 self.block_stream = Some(block_stream);
124 Ok(())
125 }
126
127 pub(crate) async fn download_next_block(
129 &mut self,
130 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131 let now = Instant::now();
132 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135 });
136 loop {
137 let ret = match &mut self.block_stream {
138 Some(block_stream) => block_stream.next_block_impl().await,
139 None => {
140 self.create_stream().await?;
141 continue;
142 }
143 };
144 match ret {
145 Ok(Some((data, _))) => {
146 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147 let filter_block = self
148 .sstable
149 .filter_reader
150 .get_block_raw_filter(self.next_block_index);
151 self.next_block_index += 1;
152 return Ok(Some((data, filter_block, meta)));
153 }
154
155 Ok(None) => break,
156
157 Err(e) => {
158 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159 return Err(e);
160 }
161
162 self.block_stream.take();
163 self.io_retry_times += 1;
164 fail_point!("create_stream_err");
165
166 tracing::warn!(
167 "fast compact retry create stream for sstable {} times, sstinfo={}",
168 self.io_retry_times,
169 format!(
170 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171 self.sstable_info.object_id,
172 self.sstable_info.sst_id,
173 self.sstable_info.meta_offset,
174 self.sstable_info.table_ids
175 )
176 );
177 }
178 }
179 }
180
181 self.next_block_index = self.sstable.meta.block_metas.len();
182 self.iter.take();
183 Ok(None)
184 }
185
186 pub(crate) fn init_block_iter(
187 &mut self,
188 buf: Bytes,
189 uncompressed_capacity: usize,
190 ) -> HummockResult<()> {
191 let block = Block::decode(buf, uncompressed_capacity)?;
192 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193 iter.seek_to_first();
194 self.iter = Some(iter);
195 Ok(())
196 }
197
198 fn next_block_smallest(&self) -> &[u8] {
199 self.sstable.meta.block_metas[self.next_block_index]
200 .smallest_key
201 .as_ref()
202 }
203
204 fn next_block_largest(&self) -> &[u8] {
205 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206 self.sstable.meta.block_metas[self.next_block_index + 1]
207 .smallest_key
208 .as_ref()
209 } else {
210 self.sstable.meta.largest_key.as_ref()
211 }
212 }
213
214 fn current_block_largest(&self) -> Vec<u8> {
215 if self.next_block_index < self.sstable.meta.block_metas.len() {
216 let mut largest_key = FullKey::decode(
217 self.sstable.meta.block_metas[self.next_block_index]
218 .smallest_key
219 .as_ref(),
220 );
221 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223 largest_key.encode()
224 } else {
225 self.sstable.meta.largest_key.clone()
226 }
227 }
228
229 fn key(&self) -> FullKey<&[u8]> {
230 match self.iter.as_ref() {
231 Some(iter) => iter.key(),
232 None => FullKey::decode(
233 self.sstable.meta.block_metas[self.next_block_index]
234 .smallest_key
235 .as_ref(),
236 ),
237 }
238 }
239
240 pub(crate) fn is_valid(&self) -> bool {
241 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242 }
243
244 #[cfg(test)]
245 #[cfg(feature = "failpoints")]
246 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247 self.iter.as_mut().unwrap()
248 }
249}
250
251impl Drop for BlockStreamIterator {
252 fn drop(&mut self) {
253 self.task_progress.dec_num_pending_read_io();
254 }
255}
256
257pub struct ConcatSstableIterator {
260 sstable_iter: Option<BlockStreamIterator>,
262
263 cur_idx: usize,
265
266 sstables: Vec<SstableInfo>,
268
269 sstable_store: SstableStoreRef,
270
271 stats: StoreLocalStatistic,
272 task_progress: Arc<TaskProgress>,
273
274 max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278 pub fn new(
282 sst_infos: Vec<SstableInfo>,
283 sstable_store: SstableStoreRef,
284 task_progress: Arc<TaskProgress>,
285 max_io_retry_times: usize,
286 ) -> Self {
287 Self {
288 sstable_iter: None,
289 cur_idx: 0,
290 sstables: sst_infos,
291 sstable_store,
292 task_progress,
293 stats: StoreLocalStatistic::default(),
294 max_io_retry_times,
295 }
296 }
297
298 pub async fn rewind(&mut self) -> HummockResult<()> {
299 self.seek_idx(0).await
300 }
301
302 pub async fn next_sstable(&mut self) -> HummockResult<()> {
303 self.seek_idx(self.cur_idx + 1).await
304 }
305
306 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307 self.sstable_iter.as_mut().unwrap()
308 }
309
310 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311 if let Some(sstable) = self.sstable_iter.as_mut() {
312 if sstable.iter.is_some() {
313 return Ok(());
314 }
315 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317 }
318 Ok(())
319 }
320
321 pub fn is_valid(&self) -> bool {
322 self.cur_idx < self.sstables.len()
323 }
324
325 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327 self.sstable_iter.take();
328 self.cur_idx = idx;
329 if self.cur_idx < self.sstables.len() {
330 let sstable_info = &self.sstables[self.cur_idx];
331 let sstable = self
332 .sstable_store
333 .sstable(sstable_info, &mut self.stats)
334 .instrument_await("stream_iter_sstable".verbose())
335 .await?;
336 self.task_progress.inc_num_pending_read_io();
337
338 let sstable_iter = BlockStreamIterator::new(
339 sstable,
340 self.task_progress.clone(),
341 self.sstable_store.clone(),
342 sstable_info.clone(),
343 self.max_io_retry_times,
344 self.stats.remote_io_time.clone(),
345 );
346 self.sstable_iter = Some(sstable_iter);
347 }
348 Ok(())
349 }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353 left: Box<ConcatSstableIterator>,
354 right: Box<ConcatSstableIterator>,
355 task_id: u64,
356 executor: CompactTaskExecutor<
357 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358 C,
359 >,
360 compression_algorithm: CompressionAlgorithm,
361 metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365 pub fn new(
366 context: CompactorContext,
367 task: CompactTask,
368 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369 object_id_getter: Arc<dyn GetObjectId>,
370 task_progress: Arc<TaskProgress>,
371 compaction_filter: C,
372 ) -> Self {
373 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375 options.compression_algorithm = compression_algorithm;
376 options.capacity = task.target_file_size as usize;
377 options.max_vnode_key_range_bytes = None;
379 let get_id_time = Arc::new(AtomicU64::new(0));
380
381 let key_range = KeyRange::inf();
382
383 let task_config = TaskConfig {
384 key_range,
385 cache_policy: CachePolicy::NotFill,
386 gc_delete_keys: task.gc_delete_keys,
387 retain_multiple_version: false,
388 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
389 table_vnode_partition: task.table_vnode_partition.clone(),
390 use_block_based_filter: true,
391 table_schemas: Default::default(),
392 disable_drop_column_optimization: false,
393 };
394 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
395
396 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
397 object_id_getter,
398 limiter: context.memory_limiter.clone(),
399 options,
400 policy: task_config.cache_policy,
401 remote_rpc_cost: get_id_time,
402 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
403 sstable_writer_factory: factory,
404 _phantom: PhantomData,
405 };
406 let sst_builder = CapacitySplitTableBuilder::new(
407 builder_factory,
408 context.compactor_metrics.clone(),
409 Some(task_progress.clone()),
410 task_config.table_vnode_partition.clone(),
411 context
412 .storage_opts
413 .compactor_concurrent_uploading_sst_count,
414 compaction_catalog_agent_ref.clone(),
415 );
416 assert_eq!(
417 task.input_ssts.len(),
418 2,
419 "TaskId {} target_level {:?} task {:?}",
420 task.task_id,
421 task.target_level,
422 compact_task_to_string(&task)
423 );
424 let left = Box::new(ConcatSstableIterator::new(
425 task.input_ssts[0].table_infos.clone(),
426 context.sstable_store.clone(),
427 task_progress.clone(),
428 context.storage_opts.compactor_iter_max_io_retry_times,
429 ));
430 let right = Box::new(ConcatSstableIterator::new(
431 task.input_ssts[1].table_infos.clone(),
432 context.sstable_store,
433 task_progress.clone(),
434 context.storage_opts.compactor_iter_max_io_retry_times,
435 ));
436
437 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439 task.pk_prefix_table_watermarks.clone(),
440 );
441 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442 task.non_pk_prefix_table_watermarks.clone(),
443 compaction_catalog_agent_ref.clone(),
444 );
445 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
446 task.value_table_watermarks.clone(),
447 compaction_catalog_agent_ref,
448 );
449
450 Self {
451 executor: CompactTaskExecutor::new(
452 sst_builder,
453 task_config,
454 task_progress,
455 pk_prefix_state,
456 non_pk_prefix_state,
457 value_skip_watermark_state,
458 compaction_filter,
459 ),
460 left,
461 right,
462 task_id: task.task_id,
463 metrics: context.compactor_metrics,
464 compression_algorithm,
465 }
466 }
467
468 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
469 self.left.rewind().await?;
470 self.right.rewind().await?;
471 let mut skip_raw_block_count = 0;
472 let mut skip_raw_block_size = 0;
473 while self.left.is_valid() && self.right.is_valid() {
474 let ret = self
475 .left
476 .current_sstable()
477 .key()
478 .cmp(&self.right.current_sstable().key());
479 let (first, second) = if ret == Ordering::Less {
480 (&mut self.left, &mut self.right)
481 } else {
482 (&mut self.right, &mut self.left)
483 };
484 assert!(
485 ret != Ordering::Equal,
486 "sst range overlap equal_key {:?}",
487 self.left.current_sstable().key()
488 );
489 if first.current_sstable().iter.is_none() {
490 let right_key = second.current_sstable().key();
491 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
492 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
493 if full_key.user_key.ge(&right_key.user_key) {
496 break;
497 }
498 let smallest_key =
499 FullKey::decode(first.current_sstable().next_block_smallest());
500 if !self.executor.shall_copy_raw_block(&smallest_key) {
501 break;
502 }
503 let smallest_key = smallest_key.to_vec();
504
505 let (mut block, filter_data, mut meta) = first
506 .current_sstable()
507 .download_next_block()
508 .await?
509 .unwrap();
510 let algorithm = Block::get_algorithm(&block)?;
511 if algorithm == CompressionAlgorithm::None
512 && algorithm != self.compression_algorithm
513 {
514 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
515 meta.len = block.len() as u32;
516 }
517
518 let largest_key = first.current_sstable().current_block_largest();
519 let block_len = block.len() as u64;
520 let block_key_count = meta.total_key_count;
521
522 if self
523 .executor
524 .builder
525 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
526 .await?
527 {
528 skip_raw_block_size += block_len;
529 skip_raw_block_count += 1;
530 }
531 self.executor.may_report_process_key(block_key_count);
532 self.executor.clear();
533 }
534 if !first.current_sstable().is_valid() {
535 first.next_sstable().await?;
536 continue;
537 }
538 first.init_block_iter().await?;
539 }
540
541 let target_key = second.current_sstable().key();
542 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
543 self.executor.run(iter, target_key).await?;
544 if !iter.is_valid() {
545 first.sstable_iter.as_mut().unwrap().iter.take();
546 if !first.current_sstable().is_valid() {
547 first.next_sstable().await?;
548 }
549 }
550 }
551 let rest_data = if !self.left.is_valid() {
552 &mut self.right
553 } else {
554 &mut self.left
555 };
556 if rest_data.is_valid() {
557 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
559 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
560 if let Some(iter) = sstable_iter.iter.as_mut() {
561 self.executor.run(iter, target_key).await?;
562 assert!(
563 !iter.is_valid(),
564 "iter should not be valid key {:?}",
565 iter.key()
566 );
567 }
568 sstable_iter.iter.take();
569 }
570
571 while rest_data.is_valid() {
572 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
573 while sstable_iter.is_valid() {
574 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
575 let (block, filter_data, block_meta) =
576 sstable_iter.download_next_block().await?.unwrap();
577 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
579 && self.executor.last_key_is_delete;
580 if self.executor.builder.need_flush()
581 || need_deleted
582 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
583 {
584 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
585 let target_key = FullKey::decode(&largest_key);
586 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
587 let mut iter = sstable_iter.iter.take().unwrap();
588 self.executor.run(&mut iter, target_key).await?;
589 } else {
590 let largest_key = sstable_iter.current_block_largest();
591 let block_len = block.len() as u64;
592 let block_key_count = block_meta.total_key_count;
593 if self
594 .executor
595 .builder
596 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
597 .await?
598 {
599 skip_raw_block_count += 1;
600 skip_raw_block_size += block_len;
601 }
602 self.executor.may_report_process_key(block_key_count);
603 self.executor.clear();
604 }
605 }
606 rest_data.next_sstable().await?;
607 }
608 let mut total_read_bytes = 0;
609 for sst in &self.left.sstables {
610 total_read_bytes += sst.sst_size;
611 }
612 for sst in &self.right.sstables {
613 total_read_bytes += sst.sst_size;
614 }
615 self.metrics
616 .compact_fast_runner_bytes
617 .inc_by(skip_raw_block_size);
618 tracing::info!(
619 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
620 skip_raw_block_count,
621 self.task_id,
622 skip_raw_block_size * 100 / total_read_bytes,
623 );
624
625 let statistic = self.executor.take_statistics();
626 let output_ssts = self.executor.builder.finish().await?;
627 Compactor::report_progress(
628 self.metrics.clone(),
629 Some(self.executor.task_progress.clone()),
630 &output_ssts,
631 false,
632 );
633 let sst_infos = output_ssts
634 .iter()
635 .map(|sst| sst.sst_info.clone())
636 .collect_vec();
637 assert!(can_concat(&sst_infos));
638 Ok((output_ssts, statistic))
639 }
640}
641
642pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
643 last_key: FullKey<Vec<u8>>,
644 compaction_statistics: CompactionStatistics,
645 last_table_id: Option<TableId>,
646 last_table_stats: TableStats,
647 builder: CapacitySplitTableBuilder<F>,
648 task_config: TaskConfig,
649 task_progress: Arc<TaskProgress>,
650 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
651 last_key_is_delete: bool,
652 progress_key_num: u32,
653 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
654 value_skip_watermark_state: ValueSkipWatermarkState,
655 compaction_filter: C,
656}
657
658impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
659 pub fn new(
660 builder: CapacitySplitTableBuilder<F>,
661 task_config: TaskConfig,
662 task_progress: Arc<TaskProgress>,
663 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
664 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
665 value_skip_watermark_state: ValueSkipWatermarkState,
666 compaction_filter: C,
667 ) -> Self {
668 Self {
669 builder,
670 task_config,
671 last_key: FullKey::default(),
672 last_key_is_delete: false,
673 compaction_statistics: CompactionStatistics::default(),
674 last_table_id: None,
675 last_table_stats: TableStats::default(),
676 task_progress,
677 pk_prefix_skip_watermark_state,
678 progress_key_num: 0,
679 non_pk_prefix_skip_watermark_state,
680 value_skip_watermark_state,
681 compaction_filter,
682 }
683 }
684
685 fn take_statistics(&mut self) -> CompactionStatistics {
686 if let Some(last_table_id) = self.last_table_id.take() {
687 self.compaction_statistics
688 .delta_drop_stat
689 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
690 }
691 std::mem::take(&mut self.compaction_statistics)
692 }
693
694 fn clear(&mut self) {
695 if !self.last_key.is_empty() {
696 self.last_key = FullKey::default();
697 }
698 self.last_key_is_delete = false;
699 }
700
701 #[inline(always)]
702 fn may_report_process_key(&mut self, key_count: u32) {
703 const PROGRESS_KEY_INTERVAL: u32 = 100;
704 self.progress_key_num += key_count;
705 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
706 self.task_progress
707 .inc_progress_key(self.progress_key_num as u64);
708 self.progress_key_num = 0;
709 }
710 }
711
712 pub async fn run(
713 &mut self,
714 iter: &mut BlockIterator,
715 target_key: FullKey<&[u8]>,
716 ) -> HummockResult<()> {
717 self.pk_prefix_skip_watermark_state.reset_watermark();
718 self.non_pk_prefix_skip_watermark_state.reset_watermark();
719
720 while iter.is_valid() && iter.key().le(&target_key) {
721 let is_new_user_key =
722 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
723 self.compaction_statistics.iter_total_key_counts += 1;
724 self.may_report_process_key(1);
725
726 let mut drop = false;
727 let value = HummockValue::from_slice(iter.value()).unwrap();
728 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
729 if is_first_or_new_user_key {
730 self.last_key.set(iter.key());
731 self.last_key_is_delete = false;
732 }
733
734 if !self.task_config.retain_multiple_version
736 && self.task_config.gc_delete_keys
737 && value.is_delete()
738 {
739 drop = true;
740 self.last_key_is_delete = true;
741 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
742 drop = true;
743 }
744
745 if !drop && self.compaction_filter.should_delete(iter.key()) {
746 drop = true;
747 }
748
749 if !drop && self.watermark_should_delete(&iter.key(), value) {
750 drop = true;
751 self.last_key_is_delete = true;
752 }
753
754 if self.last_table_id != Some(self.last_key.user_key.table_id) {
755 if let Some(last_table_id) = self.last_table_id.take() {
756 self.compaction_statistics
757 .delta_drop_stat
758 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
759 }
760 self.last_table_id = Some(self.last_key.user_key.table_id);
761 }
762
763 if drop {
764 self.compaction_statistics.iter_drop_key_counts += 1;
765
766 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
767 Some(target_table_ids) => {
768 target_table_ids.contains(&self.last_key.user_key.table_id)
769 }
770 None => true,
771 };
772 if should_count {
773 self.last_table_stats.total_key_count -= 1;
774 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
775 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
776 }
777 iter.next();
778 continue;
779 }
780 self.builder
781 .add_full_key(iter.key(), value, is_new_user_key)
782 .await?;
783 iter.next();
784 }
785 Ok(())
786 }
787
788 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
789 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
790 return false;
793 }
794
795 if self.watermark_may_delete(smallest_key) {
796 return false;
797 }
798
799 if self.compaction_filter.should_delete(*smallest_key) {
801 return false;
802 }
803
804 true
805 }
806
807 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
808 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
810 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
811 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
812 let unused = vec![];
813 let unused_put = HummockValue::Put(unused.as_slice());
814 if (pk_prefix_has_watermark
815 && self
816 .pk_prefix_skip_watermark_state
817 .should_delete(key, unused_put))
818 || (non_pk_prefix_has_watermark
819 && self
820 .non_pk_prefix_skip_watermark_state
821 .should_delete(key, unused_put))
822 {
823 return true;
824 }
825 }
826 self.value_skip_watermark_state.has_watermark()
827 && self.value_skip_watermark_state.may_delete(key)
828 }
829
830 fn watermark_should_delete(
831 &mut self,
832 key: &FullKey<&[u8]>,
833 value: HummockValue<&[u8]>,
834 ) -> bool {
835 (self.pk_prefix_skip_watermark_state.has_watermark()
836 && self
837 .pk_prefix_skip_watermark_state
838 .should_delete(key, value))
839 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
840 && self
841 .non_pk_prefix_skip_watermark_state
842 .should_delete(key, value))
843 || (self.value_skip_watermark_state.has_watermark()
844 && self.value_skip_watermark_state.should_delete(key, value))
845 }
846}