1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39 RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43 ValueSkipWatermarkState,
44};
45use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
46use crate::hummock::sstable_store::SstableStoreRef;
47use crate::hummock::value::HummockValue;
48use crate::hummock::{
49 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
50 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
51 TableHolder, UnifiedSstableWriterFactory,
52};
53use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
54
55pub struct BlockStreamIterator {
57 block_stream: Option<BlockDataStream>,
59
60 next_block_index: usize,
61
62 sstable: TableHolder,
64 iter: Option<BlockIterator>,
65 task_progress: Arc<TaskProgress>,
66
67 sstable_store: SstableStoreRef,
69 sstable_info: SstableInfo,
70 io_retry_times: usize,
71 max_io_retry_times: usize,
72 stats_ptr: Arc<AtomicU64>,
73}
74
75impl BlockStreamIterator {
76 pub fn new(
91 sstable: TableHolder,
92 task_progress: Arc<TaskProgress>,
93 sstable_store: SstableStoreRef,
94 sstable_info: SstableInfo,
95 max_io_retry_times: usize,
96 stats_ptr: Arc<AtomicU64>,
97 ) -> Self {
98 Self {
99 block_stream: None,
100 next_block_index: 0,
101 sstable,
102 iter: None,
103 task_progress,
104 sstable_store,
105 sstable_info,
106 io_retry_times: 0,
107 max_io_retry_times,
108 stats_ptr,
109 }
110 }
111
112 async fn create_stream(&mut self) -> HummockResult<()> {
113 let block_stream = self
116 .sstable_store
117 .get_stream_for_blocks(
118 self.sstable_info.object_id,
119 &self.sstable.meta.block_metas[self.next_block_index..],
120 )
121 .instrument_await("stream_iter_get_stream".verbose())
122 .await?;
123 self.block_stream = Some(block_stream);
124 Ok(())
125 }
126
127 pub(crate) async fn download_next_block(
129 &mut self,
130 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
131 let now = Instant::now();
132 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
133 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
134 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
135 });
136 loop {
137 let ret = match &mut self.block_stream {
138 Some(block_stream) => block_stream.next_block_impl().await,
139 None => {
140 self.create_stream().await?;
141 continue;
142 }
143 };
144 match ret {
145 Ok(Some((data, _))) => {
146 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
147 let filter_block = self
148 .sstable
149 .filter_reader
150 .get_block_raw_filter(self.next_block_index);
151 self.next_block_index += 1;
152 return Ok(Some((data, filter_block, meta)));
153 }
154
155 Ok(None) => break,
156
157 Err(e) => {
158 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
159 return Err(e);
160 }
161
162 self.block_stream.take();
163 self.io_retry_times += 1;
164 fail_point!("create_stream_err");
165
166 tracing::warn!(
167 "fast compact retry create stream for sstable {} times, sstinfo={}",
168 self.io_retry_times,
169 format!(
170 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
171 self.sstable_info.object_id,
172 self.sstable_info.sst_id,
173 self.sstable_info.meta_offset,
174 self.sstable_info.table_ids
175 )
176 );
177 }
178 }
179 }
180
181 self.next_block_index = self.sstable.meta.block_metas.len();
182 self.iter.take();
183 Ok(None)
184 }
185
186 pub(crate) fn init_block_iter(
187 &mut self,
188 buf: Bytes,
189 uncompressed_capacity: usize,
190 ) -> HummockResult<()> {
191 let block = Block::decode(buf, uncompressed_capacity)?;
192 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
193 iter.seek_to_first();
194 self.iter = Some(iter);
195 Ok(())
196 }
197
198 fn next_block_smallest(&self) -> &[u8] {
199 self.sstable.meta.block_metas[self.next_block_index]
200 .smallest_key
201 .as_ref()
202 }
203
204 fn next_block_largest(&self) -> &[u8] {
205 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
206 self.sstable.meta.block_metas[self.next_block_index + 1]
207 .smallest_key
208 .as_ref()
209 } else {
210 self.sstable.meta.largest_key.as_ref()
211 }
212 }
213
214 fn current_block_largest(&self) -> Vec<u8> {
215 if self.next_block_index < self.sstable.meta.block_metas.len() {
216 let mut largest_key = FullKey::decode(
217 self.sstable.meta.block_metas[self.next_block_index]
218 .smallest_key
219 .as_ref(),
220 );
221 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
223 largest_key.encode()
224 } else {
225 self.sstable.meta.largest_key.clone()
226 }
227 }
228
229 fn key(&self) -> FullKey<&[u8]> {
230 match self.iter.as_ref() {
231 Some(iter) => iter.key(),
232 None => FullKey::decode(
233 self.sstable.meta.block_metas[self.next_block_index]
234 .smallest_key
235 .as_ref(),
236 ),
237 }
238 }
239
240 pub(crate) fn is_valid(&self) -> bool {
241 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
242 }
243
244 #[cfg(test)]
245 #[cfg(feature = "failpoints")]
246 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
247 self.iter.as_mut().unwrap()
248 }
249}
250
251impl Drop for BlockStreamIterator {
252 fn drop(&mut self) {
253 self.task_progress.dec_num_pending_read_io();
254 }
255}
256
257pub struct ConcatSstableIterator {
260 sstable_iter: Option<BlockStreamIterator>,
262
263 cur_idx: usize,
265
266 sstables: Vec<SstableInfo>,
268
269 sstable_store: SstableStoreRef,
270
271 stats: StoreLocalStatistic,
272 task_progress: Arc<TaskProgress>,
273
274 max_io_retry_times: usize,
275}
276
277impl ConcatSstableIterator {
278 pub fn new(
282 sst_infos: Vec<SstableInfo>,
283 sstable_store: SstableStoreRef,
284 task_progress: Arc<TaskProgress>,
285 max_io_retry_times: usize,
286 ) -> Self {
287 Self {
288 sstable_iter: None,
289 cur_idx: 0,
290 sstables: sst_infos,
291 sstable_store,
292 task_progress,
293 stats: StoreLocalStatistic::default(),
294 max_io_retry_times,
295 }
296 }
297
298 pub async fn rewind(&mut self) -> HummockResult<()> {
299 self.seek_idx(0).await
300 }
301
302 pub async fn next_sstable(&mut self) -> HummockResult<()> {
303 self.seek_idx(self.cur_idx + 1).await
304 }
305
306 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
307 self.sstable_iter.as_mut().unwrap()
308 }
309
310 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
311 if let Some(sstable) = self.sstable_iter.as_mut() {
312 if sstable.iter.is_some() {
313 return Ok(());
314 }
315 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
316 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
317 }
318 Ok(())
319 }
320
321 pub fn is_valid(&self) -> bool {
322 self.cur_idx < self.sstables.len()
323 }
324
325 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
327 self.sstable_iter.take();
328 self.cur_idx = idx;
329 if self.cur_idx < self.sstables.len() {
330 let sstable_info = &self.sstables[self.cur_idx];
331 let sstable = self
332 .sstable_store
333 .sstable(sstable_info, &mut self.stats)
334 .instrument_await("stream_iter_sstable".verbose())
335 .await?;
336 self.task_progress.inc_num_pending_read_io();
337
338 let sstable_iter = BlockStreamIterator::new(
339 sstable,
340 self.task_progress.clone(),
341 self.sstable_store.clone(),
342 sstable_info.clone(),
343 self.max_io_retry_times,
344 self.stats.remote_io_time.clone(),
345 );
346 self.sstable_iter = Some(sstable_iter);
347 }
348 Ok(())
349 }
350}
351
352pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
353 left: Box<ConcatSstableIterator>,
354 right: Box<ConcatSstableIterator>,
355 task_id: u64,
356 executor: CompactTaskExecutor<
357 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
358 C,
359 >,
360 compression_algorithm: CompressionAlgorithm,
361 metrics: Arc<CompactorMetrics>,
362}
363
364impl<C: CompactionFilter> CompactorRunner<C> {
365 pub fn new(
366 context: CompactorContext,
367 task: CompactTask,
368 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
369 object_id_getter: Arc<dyn GetObjectId>,
370 task_progress: Arc<TaskProgress>,
371 compaction_filter: C,
372 ) -> Self {
373 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
374 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
375 options.compression_algorithm = compression_algorithm;
376 options.capacity = task.target_file_size as usize;
377 let get_id_time = Arc::new(AtomicU64::new(0));
378
379 let key_range = KeyRange::inf();
380
381 let task_config = TaskConfig {
382 key_range,
383 cache_policy: CachePolicy::NotFill,
384 gc_delete_keys: task.gc_delete_keys,
385 retain_multiple_version: false,
386 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
387 task_type: task.task_type,
388 table_vnode_partition: task.table_vnode_partition.clone(),
389 use_block_based_filter: true,
390 table_schemas: Default::default(),
391 disable_drop_column_optimization: false,
392 };
393 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
394
395 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
396 object_id_getter,
397 limiter: context.memory_limiter.clone(),
398 options,
399 policy: task_config.cache_policy,
400 remote_rpc_cost: get_id_time,
401 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
402 sstable_writer_factory: factory,
403 _phantom: PhantomData,
404 };
405 let sst_builder = CapacitySplitTableBuilder::new(
406 builder_factory,
407 context.compactor_metrics.clone(),
408 Some(task_progress.clone()),
409 task_config.table_vnode_partition.clone(),
410 context
411 .storage_opts
412 .compactor_concurrent_uploading_sst_count,
413 compaction_catalog_agent_ref.clone(),
414 );
415 assert_eq!(
416 task.input_ssts.len(),
417 2,
418 "TaskId {} target_level {:?} task {:?}",
419 task.task_id,
420 task.target_level,
421 compact_task_to_string(&task)
422 );
423 let left = Box::new(ConcatSstableIterator::new(
424 task.input_ssts[0].table_infos.clone(),
425 context.sstable_store.clone(),
426 task_progress.clone(),
427 context.storage_opts.compactor_iter_max_io_retry_times,
428 ));
429 let right = Box::new(ConcatSstableIterator::new(
430 task.input_ssts[1].table_infos.clone(),
431 context.sstable_store,
432 task_progress.clone(),
433 context.storage_opts.compactor_iter_max_io_retry_times,
434 ));
435
436 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
438 task.pk_prefix_table_watermarks.clone(),
439 );
440 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
441 task.non_pk_prefix_table_watermarks.clone(),
442 compaction_catalog_agent_ref.clone(),
443 );
444 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
445 task.value_table_watermarks.clone(),
446 compaction_catalog_agent_ref,
447 );
448
449 Self {
450 executor: CompactTaskExecutor::new(
451 sst_builder,
452 task_config,
453 task_progress,
454 pk_prefix_state,
455 non_pk_prefix_state,
456 value_skip_watermark_state,
457 compaction_filter,
458 ),
459 left,
460 right,
461 task_id: task.task_id,
462 metrics: context.compactor_metrics,
463 compression_algorithm,
464 }
465 }
466
467 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
468 self.left.rewind().await?;
469 self.right.rewind().await?;
470 let mut skip_raw_block_count = 0;
471 let mut skip_raw_block_size = 0;
472 while self.left.is_valid() && self.right.is_valid() {
473 let ret = self
474 .left
475 .current_sstable()
476 .key()
477 .cmp(&self.right.current_sstable().key());
478 let (first, second) = if ret == Ordering::Less {
479 (&mut self.left, &mut self.right)
480 } else {
481 (&mut self.right, &mut self.left)
482 };
483 assert!(
484 ret != Ordering::Equal,
485 "sst range overlap equal_key {:?}",
486 self.left.current_sstable().key()
487 );
488 if first.current_sstable().iter.is_none() {
489 let right_key = second.current_sstable().key();
490 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
491 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
492 if full_key.user_key.ge(&right_key.user_key) {
495 break;
496 }
497 let smallest_key =
498 FullKey::decode(first.current_sstable().next_block_smallest());
499 if !self.executor.shall_copy_raw_block(&smallest_key) {
500 break;
501 }
502 let smallest_key = smallest_key.to_vec();
503
504 let (mut block, filter_data, mut meta) = first
505 .current_sstable()
506 .download_next_block()
507 .await?
508 .unwrap();
509 let algorithm = Block::get_algorithm(&block)?;
510 if algorithm == CompressionAlgorithm::None
511 && algorithm != self.compression_algorithm
512 {
513 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
514 meta.len = block.len() as u32;
515 }
516
517 let largest_key = first.current_sstable().current_block_largest();
518 let block_len = block.len() as u64;
519 let block_key_count = meta.total_key_count;
520
521 if self
522 .executor
523 .builder
524 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
525 .await?
526 {
527 skip_raw_block_size += block_len;
528 skip_raw_block_count += 1;
529 }
530 self.executor.may_report_process_key(block_key_count);
531 self.executor.clear();
532 }
533 if !first.current_sstable().is_valid() {
534 first.next_sstable().await?;
535 continue;
536 }
537 first.init_block_iter().await?;
538 }
539
540 let target_key = second.current_sstable().key();
541 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
542 self.executor.run(iter, target_key).await?;
543 if !iter.is_valid() {
544 first.sstable_iter.as_mut().unwrap().iter.take();
545 if !first.current_sstable().is_valid() {
546 first.next_sstable().await?;
547 }
548 }
549 }
550 let rest_data = if !self.left.is_valid() {
551 &mut self.right
552 } else {
553 &mut self.left
554 };
555 if rest_data.is_valid() {
556 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
558 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
559 if let Some(iter) = sstable_iter.iter.as_mut() {
560 self.executor.run(iter, target_key).await?;
561 assert!(
562 !iter.is_valid(),
563 "iter should not be valid key {:?}",
564 iter.key()
565 );
566 }
567 sstable_iter.iter.take();
568 }
569
570 while rest_data.is_valid() {
571 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
572 while sstable_iter.is_valid() {
573 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
574 let (block, filter_data, block_meta) =
575 sstable_iter.download_next_block().await?.unwrap();
576 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
578 && self.executor.last_key_is_delete;
579 if self.executor.builder.need_flush()
580 || need_deleted
581 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
582 {
583 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
584 let target_key = FullKey::decode(&largest_key);
585 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
586 let mut iter = sstable_iter.iter.take().unwrap();
587 self.executor.run(&mut iter, target_key).await?;
588 } else {
589 let largest_key = sstable_iter.current_block_largest();
590 let block_len = block.len() as u64;
591 let block_key_count = block_meta.total_key_count;
592 if self
593 .executor
594 .builder
595 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
596 .await?
597 {
598 skip_raw_block_count += 1;
599 skip_raw_block_size += block_len;
600 }
601 self.executor.may_report_process_key(block_key_count);
602 self.executor.clear();
603 }
604 }
605 rest_data.next_sstable().await?;
606 }
607 let mut total_read_bytes = 0;
608 for sst in &self.left.sstables {
609 total_read_bytes += sst.sst_size;
610 }
611 for sst in &self.right.sstables {
612 total_read_bytes += sst.sst_size;
613 }
614 self.metrics
615 .compact_fast_runner_bytes
616 .inc_by(skip_raw_block_size);
617 tracing::info!(
618 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
619 skip_raw_block_count,
620 self.task_id,
621 skip_raw_block_size * 100 / total_read_bytes,
622 );
623
624 let statistic = self.executor.take_statistics();
625 let output_ssts = self.executor.builder.finish().await?;
626 Compactor::report_progress(
627 self.metrics.clone(),
628 Some(self.executor.task_progress.clone()),
629 &output_ssts,
630 false,
631 );
632 let sst_infos = output_ssts
633 .iter()
634 .map(|sst| sst.sst_info.clone())
635 .collect_vec();
636 assert!(can_concat(&sst_infos));
637 Ok((output_ssts, statistic))
638 }
639}
640
641pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
642 last_key: FullKey<Vec<u8>>,
643 compaction_statistics: CompactionStatistics,
644 last_table_id: Option<TableId>,
645 last_table_stats: TableStats,
646 builder: CapacitySplitTableBuilder<F>,
647 task_config: TaskConfig,
648 task_progress: Arc<TaskProgress>,
649 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
650 last_key_is_delete: bool,
651 progress_key_num: u32,
652 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
653 value_skip_watermark_state: ValueSkipWatermarkState,
654 compaction_filter: C,
655}
656
657impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
658 pub fn new(
659 builder: CapacitySplitTableBuilder<F>,
660 task_config: TaskConfig,
661 task_progress: Arc<TaskProgress>,
662 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
663 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
664 value_skip_watermark_state: ValueSkipWatermarkState,
665 compaction_filter: C,
666 ) -> Self {
667 Self {
668 builder,
669 task_config,
670 last_key: FullKey::default(),
671 last_key_is_delete: false,
672 compaction_statistics: CompactionStatistics::default(),
673 last_table_id: None,
674 last_table_stats: TableStats::default(),
675 task_progress,
676 pk_prefix_skip_watermark_state,
677 progress_key_num: 0,
678 non_pk_prefix_skip_watermark_state,
679 value_skip_watermark_state,
680 compaction_filter,
681 }
682 }
683
684 fn take_statistics(&mut self) -> CompactionStatistics {
685 if let Some(last_table_id) = self.last_table_id.take() {
686 self.compaction_statistics
687 .delta_drop_stat
688 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
689 }
690 std::mem::take(&mut self.compaction_statistics)
691 }
692
693 fn clear(&mut self) {
694 if !self.last_key.is_empty() {
695 self.last_key = FullKey::default();
696 }
697 self.last_key_is_delete = false;
698 }
699
700 #[inline(always)]
701 fn may_report_process_key(&mut self, key_count: u32) {
702 const PROGRESS_KEY_INTERVAL: u32 = 100;
703 self.progress_key_num += key_count;
704 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
705 self.task_progress
706 .inc_progress_key(self.progress_key_num as u64);
707 self.progress_key_num = 0;
708 }
709 }
710
711 pub async fn run(
712 &mut self,
713 iter: &mut BlockIterator,
714 target_key: FullKey<&[u8]>,
715 ) -> HummockResult<()> {
716 self.pk_prefix_skip_watermark_state.reset_watermark();
717 self.non_pk_prefix_skip_watermark_state.reset_watermark();
718
719 while iter.is_valid() && iter.key().le(&target_key) {
720 let is_new_user_key =
721 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
722 self.compaction_statistics.iter_total_key_counts += 1;
723 self.may_report_process_key(1);
724
725 let mut drop = false;
726 let value = HummockValue::from_slice(iter.value()).unwrap();
727 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
728 if is_first_or_new_user_key {
729 self.last_key.set(iter.key());
730 self.last_key_is_delete = false;
731 }
732
733 if !self.task_config.retain_multiple_version
735 && self.task_config.gc_delete_keys
736 && value.is_delete()
737 {
738 drop = true;
739 self.last_key_is_delete = true;
740 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
741 drop = true;
742 }
743
744 if !drop && self.compaction_filter.should_delete(iter.key()) {
745 drop = true;
746 }
747
748 if !drop && self.watermark_should_delete(&iter.key(), value) {
749 drop = true;
750 self.last_key_is_delete = true;
751 }
752
753 if self.last_table_id != Some(self.last_key.user_key.table_id) {
754 if let Some(last_table_id) = self.last_table_id.take() {
755 self.compaction_statistics
756 .delta_drop_stat
757 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
758 }
759 self.last_table_id = Some(self.last_key.user_key.table_id);
760 }
761
762 if drop {
763 self.compaction_statistics.iter_drop_key_counts += 1;
764
765 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
766 Some(target_table_ids) => {
767 target_table_ids.contains(&self.last_key.user_key.table_id)
768 }
769 None => true,
770 };
771 if should_count {
772 self.last_table_stats.total_key_count -= 1;
773 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
774 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
775 }
776 iter.next();
777 continue;
778 }
779 self.builder
780 .add_full_key(iter.key(), value, is_new_user_key)
781 .await?;
782 iter.next();
783 }
784 Ok(())
785 }
786
787 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
788 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
789 return false;
792 }
793
794 if self.watermark_may_delete(smallest_key) {
795 return false;
796 }
797
798 if self.compaction_filter.should_delete(*smallest_key) {
800 return false;
801 }
802
803 true
804 }
805
806 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
807 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
809 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
810 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
811 let unused = vec![];
812 let unused_put = HummockValue::Put(unused.as_slice());
813 if (pk_prefix_has_watermark
814 && self
815 .pk_prefix_skip_watermark_state
816 .should_delete(key, unused_put))
817 || (non_pk_prefix_has_watermark
818 && self
819 .non_pk_prefix_skip_watermark_state
820 .should_delete(key, unused_put))
821 {
822 return true;
823 }
824 }
825 self.value_skip_watermark_state.has_watermark()
826 && self.value_skip_watermark_state.may_delete(key)
827 }
828
829 fn watermark_should_delete(
830 &mut self,
831 key: &FullKey<&[u8]>,
832 value: HummockValue<&[u8]>,
833 ) -> bool {
834 (self.pk_prefix_skip_watermark_state.has_watermark()
835 && self
836 .pk_prefix_skip_watermark_state
837 .should_delete(key, value))
838 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
839 && self
840 .non_pk_prefix_skip_watermark_state
841 .should_delete(key, value))
842 || (self.value_skip_watermark_state.has_watermark()
843 && self.value_skip_watermark_state.should_delete(key, value))
844 }
845}