1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStats;
31use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
32
33use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
34use crate::hummock::block_stream::BlockDataStream;
35use crate::hummock::compactor::task_progress::TaskProgress;
36use crate::hummock::compactor::{
37 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
38 RemoteBuilderFactory, TaskConfig,
39};
40use crate::hummock::iterator::{
41 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
42};
43use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
44use crate::hummock::sstable_store::SstableStoreRef;
45use crate::hummock::value::HummockValue;
46use crate::hummock::{
47 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
48 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
49 TableHolder, UnifiedSstableWriterFactory,
50};
51use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
52
53pub struct BlockStreamIterator {
55 block_stream: Option<BlockDataStream>,
57
58 next_block_index: usize,
59
60 sstable: TableHolder,
62 iter: Option<BlockIterator>,
63 task_progress: Arc<TaskProgress>,
64
65 sstable_store: SstableStoreRef,
67 sstable_info: SstableInfo,
68 io_retry_times: usize,
69 max_io_retry_times: usize,
70 stats_ptr: Arc<AtomicU64>,
71}
72
73impl BlockStreamIterator {
74 pub fn new(
89 sstable: TableHolder,
90 task_progress: Arc<TaskProgress>,
91 sstable_store: SstableStoreRef,
92 sstable_info: SstableInfo,
93 max_io_retry_times: usize,
94 stats_ptr: Arc<AtomicU64>,
95 ) -> Self {
96 Self {
97 block_stream: None,
98 next_block_index: 0,
99 sstable,
100 iter: None,
101 task_progress,
102 sstable_store,
103 sstable_info,
104 io_retry_times: 0,
105 max_io_retry_times,
106 stats_ptr,
107 }
108 }
109
110 async fn create_stream(&mut self) -> HummockResult<()> {
111 let block_stream = self
114 .sstable_store
115 .get_stream_for_blocks(
116 self.sstable_info.object_id,
117 &self.sstable.meta.block_metas[self.next_block_index..],
118 )
119 .instrument_await("stream_iter_get_stream".verbose())
120 .await?;
121 self.block_stream = Some(block_stream);
122 Ok(())
123 }
124
125 pub(crate) async fn download_next_block(
127 &mut self,
128 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
129 let now = Instant::now();
130 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
131 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
132 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
133 });
134 loop {
135 let ret = match &mut self.block_stream {
136 Some(block_stream) => block_stream.next_block_impl().await,
137 None => {
138 self.create_stream().await?;
139 continue;
140 }
141 };
142 match ret {
143 Ok(Some((data, _))) => {
144 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
145 let filter_block = self
146 .sstable
147 .filter_reader
148 .get_block_raw_filter(self.next_block_index);
149 self.next_block_index += 1;
150 return Ok(Some((data, filter_block, meta)));
151 }
152
153 Ok(None) => break,
154
155 Err(e) => {
156 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
157 return Err(e);
158 }
159
160 self.block_stream.take();
161 self.io_retry_times += 1;
162 fail_point!("create_stream_err");
163
164 tracing::warn!(
165 "fast compact retry create stream for sstable {} times, sstinfo={}",
166 self.io_retry_times,
167 format!(
168 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
169 self.sstable_info.object_id,
170 self.sstable_info.sst_id,
171 self.sstable_info.meta_offset,
172 self.sstable_info.table_ids
173 )
174 );
175 }
176 }
177 }
178
179 self.next_block_index = self.sstable.meta.block_metas.len();
180 self.iter.take();
181 Ok(None)
182 }
183
184 pub(crate) fn init_block_iter(
185 &mut self,
186 buf: Bytes,
187 uncompressed_capacity: usize,
188 ) -> HummockResult<()> {
189 let block = Block::decode(buf, uncompressed_capacity)?;
190 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
191 iter.seek_to_first();
192 self.iter = Some(iter);
193 Ok(())
194 }
195
196 fn next_block_smallest(&self) -> &[u8] {
197 self.sstable.meta.block_metas[self.next_block_index]
198 .smallest_key
199 .as_ref()
200 }
201
202 fn next_block_largest(&self) -> &[u8] {
203 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
204 self.sstable.meta.block_metas[self.next_block_index + 1]
205 .smallest_key
206 .as_ref()
207 } else {
208 self.sstable.meta.largest_key.as_ref()
209 }
210 }
211
212 fn current_block_largest(&self) -> Vec<u8> {
213 if self.next_block_index < self.sstable.meta.block_metas.len() {
214 let mut largest_key = FullKey::decode(
215 self.sstable.meta.block_metas[self.next_block_index]
216 .smallest_key
217 .as_ref(),
218 );
219 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
221 largest_key.encode()
222 } else {
223 self.sstable.meta.largest_key.clone()
224 }
225 }
226
227 fn key(&self) -> FullKey<&[u8]> {
228 match self.iter.as_ref() {
229 Some(iter) => iter.key(),
230 None => FullKey::decode(
231 self.sstable.meta.block_metas[self.next_block_index]
232 .smallest_key
233 .as_ref(),
234 ),
235 }
236 }
237
238 pub(crate) fn is_valid(&self) -> bool {
239 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
240 }
241
242 #[cfg(test)]
243 #[cfg(feature = "failpoints")]
244 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
245 self.iter.as_mut().unwrap()
246 }
247}
248
249impl Drop for BlockStreamIterator {
250 fn drop(&mut self) {
251 self.task_progress.dec_num_pending_read_io();
252 }
253}
254
255pub struct ConcatSstableIterator {
258 sstable_iter: Option<BlockStreamIterator>,
260
261 cur_idx: usize,
263
264 sstables: Vec<SstableInfo>,
266
267 sstable_store: SstableStoreRef,
268
269 stats: StoreLocalStatistic,
270 task_progress: Arc<TaskProgress>,
271
272 max_io_retry_times: usize,
273}
274
275impl ConcatSstableIterator {
276 pub fn new(
280 sst_infos: Vec<SstableInfo>,
281 sstable_store: SstableStoreRef,
282 task_progress: Arc<TaskProgress>,
283 max_io_retry_times: usize,
284 ) -> Self {
285 Self {
286 sstable_iter: None,
287 cur_idx: 0,
288 sstables: sst_infos,
289 sstable_store,
290 task_progress,
291 stats: StoreLocalStatistic::default(),
292 max_io_retry_times,
293 }
294 }
295
296 pub async fn rewind(&mut self) -> HummockResult<()> {
297 self.seek_idx(0).await
298 }
299
300 pub async fn next_sstable(&mut self) -> HummockResult<()> {
301 self.seek_idx(self.cur_idx + 1).await
302 }
303
304 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
305 self.sstable_iter.as_mut().unwrap()
306 }
307
308 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
309 if let Some(sstable) = self.sstable_iter.as_mut() {
310 if sstable.iter.is_some() {
311 return Ok(());
312 }
313 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
314 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
315 }
316 Ok(())
317 }
318
319 pub fn is_valid(&self) -> bool {
320 self.cur_idx < self.sstables.len()
321 }
322
323 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
325 self.sstable_iter.take();
326 self.cur_idx = idx;
327 if self.cur_idx < self.sstables.len() {
328 let sstable_info = &self.sstables[self.cur_idx];
329 let sstable = self
330 .sstable_store
331 .sstable(sstable_info, &mut self.stats)
332 .instrument_await("stream_iter_sstable".verbose())
333 .await?;
334 self.task_progress.inc_num_pending_read_io();
335
336 let sstable_iter = BlockStreamIterator::new(
337 sstable,
338 self.task_progress.clone(),
339 self.sstable_store.clone(),
340 sstable_info.clone(),
341 self.max_io_retry_times,
342 self.stats.remote_io_time.clone(),
343 );
344 self.sstable_iter = Some(sstable_iter);
345 }
346 Ok(())
347 }
348}
349
350pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
351 left: Box<ConcatSstableIterator>,
352 right: Box<ConcatSstableIterator>,
353 task_id: u64,
354 executor: CompactTaskExecutor<
355 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
356 C,
357 >,
358 compression_algorithm: CompressionAlgorithm,
359 metrics: Arc<CompactorMetrics>,
360}
361
362impl<C: CompactionFilter> CompactorRunner<C> {
363 pub fn new(
364 context: CompactorContext,
365 task: CompactTask,
366 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
367 object_id_getter: Arc<dyn GetObjectId>,
368 task_progress: Arc<TaskProgress>,
369 compaction_filter: C,
370 ) -> Self {
371 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
372 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
373 options.compression_algorithm = compression_algorithm;
374 options.capacity = task.target_file_size as usize;
375 let get_id_time = Arc::new(AtomicU64::new(0));
376
377 let key_range = KeyRange::inf();
378
379 let task_config = TaskConfig {
380 key_range,
381 cache_policy: CachePolicy::NotFill,
382 gc_delete_keys: task.gc_delete_keys,
383 retain_multiple_version: false,
384 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
385 task_type: task.task_type,
386 table_vnode_partition: task.table_vnode_partition.clone(),
387 use_block_based_filter: true,
388 table_schemas: Default::default(),
389 disable_drop_column_optimization: false,
390 };
391 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
392
393 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
394 object_id_getter,
395 limiter: context.memory_limiter.clone(),
396 options,
397 policy: task_config.cache_policy,
398 remote_rpc_cost: get_id_time,
399 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
400 sstable_writer_factory: factory,
401 _phantom: PhantomData,
402 };
403 let sst_builder = CapacitySplitTableBuilder::new(
404 builder_factory,
405 context.compactor_metrics.clone(),
406 Some(task_progress.clone()),
407 task_config.table_vnode_partition.clone(),
408 context
409 .storage_opts
410 .compactor_concurrent_uploading_sst_count,
411 compaction_catalog_agent_ref.clone(),
412 );
413 assert_eq!(
414 task.input_ssts.len(),
415 2,
416 "TaskId {} target_level {:?} task {:?}",
417 task.task_id,
418 task.target_level,
419 compact_task_to_string(&task)
420 );
421 let left = Box::new(ConcatSstableIterator::new(
422 task.input_ssts[0].table_infos.clone(),
423 context.sstable_store.clone(),
424 task_progress.clone(),
425 context.storage_opts.compactor_iter_max_io_retry_times,
426 ));
427 let right = Box::new(ConcatSstableIterator::new(
428 task.input_ssts[1].table_infos.clone(),
429 context.sstable_store,
430 task_progress.clone(),
431 context.storage_opts.compactor_iter_max_io_retry_times,
432 ));
433
434 let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436 task.pk_prefix_table_watermarks.clone(),
437 );
438 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
439 task.non_pk_prefix_table_watermarks.clone(),
440 compaction_catalog_agent_ref,
441 );
442
443 Self {
444 executor: CompactTaskExecutor::new(
445 sst_builder,
446 task_config,
447 task_progress,
448 state,
449 non_pk_prefix_state,
450 compaction_filter,
451 ),
452 left,
453 right,
454 task_id: task.task_id,
455 metrics: context.compactor_metrics.clone(),
456 compression_algorithm,
457 }
458 }
459
460 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
461 self.left.rewind().await?;
462 self.right.rewind().await?;
463 let mut skip_raw_block_count = 0;
464 let mut skip_raw_block_size = 0;
465 while self.left.is_valid() && self.right.is_valid() {
466 let ret = self
467 .left
468 .current_sstable()
469 .key()
470 .cmp(&self.right.current_sstable().key());
471 let (first, second) = if ret == Ordering::Less {
472 (&mut self.left, &mut self.right)
473 } else {
474 (&mut self.right, &mut self.left)
475 };
476 assert!(
477 ret != Ordering::Equal,
478 "sst range overlap equal_key {:?}",
479 self.left.current_sstable().key()
480 );
481 if first.current_sstable().iter.is_none() {
482 let right_key = second.current_sstable().key();
483 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
484 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
485 if full_key.user_key.ge(&right_key.user_key) {
488 break;
489 }
490 let smallest_key =
491 FullKey::decode(first.current_sstable().next_block_smallest());
492 if !self.executor.shall_copy_raw_block(&smallest_key) {
493 break;
494 }
495 let smallest_key = smallest_key.to_vec();
496
497 let (mut block, filter_data, mut meta) = first
498 .current_sstable()
499 .download_next_block()
500 .await?
501 .unwrap();
502 let algorithm = Block::get_algorithm(&block)?;
503 if algorithm == CompressionAlgorithm::None
504 && algorithm != self.compression_algorithm
505 {
506 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
507 meta.len = block.len() as u32;
508 }
509
510 let largest_key = first.current_sstable().current_block_largest();
511 let block_len = block.len() as u64;
512 let block_key_count = meta.total_key_count;
513
514 if self
515 .executor
516 .builder
517 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
518 .await?
519 {
520 skip_raw_block_size += block_len;
521 skip_raw_block_count += 1;
522 }
523 self.executor.may_report_process_key(block_key_count);
524 self.executor.clear();
525 }
526 if !first.current_sstable().is_valid() {
527 first.next_sstable().await?;
528 continue;
529 }
530 first.init_block_iter().await?;
531 }
532
533 let target_key = second.current_sstable().key();
534 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
535 self.executor.run(iter, target_key).await?;
536 if !iter.is_valid() {
537 first.sstable_iter.as_mut().unwrap().iter.take();
538 if !first.current_sstable().is_valid() {
539 first.next_sstable().await?;
540 }
541 }
542 }
543 let rest_data = if !self.left.is_valid() {
544 &mut self.right
545 } else {
546 &mut self.left
547 };
548 if rest_data.is_valid() {
549 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
551 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
552 if let Some(iter) = sstable_iter.iter.as_mut() {
553 self.executor.run(iter, target_key).await?;
554 assert!(
555 !iter.is_valid(),
556 "iter should not be valid key {:?}",
557 iter.key()
558 );
559 }
560 sstable_iter.iter.take();
561 }
562
563 while rest_data.is_valid() {
564 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
565 while sstable_iter.is_valid() {
566 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
567 let (block, filter_data, block_meta) =
568 sstable_iter.download_next_block().await?.unwrap();
569 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
571 && self.executor.last_key_is_delete;
572 if self.executor.builder.need_flush()
573 || need_deleted
574 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
575 {
576 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
577 let target_key = FullKey::decode(&largest_key);
578 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
579 let mut iter = sstable_iter.iter.take().unwrap();
580 self.executor.run(&mut iter, target_key).await?;
581 } else {
582 let largest_key = sstable_iter.current_block_largest();
583 let block_len = block.len() as u64;
584 let block_key_count = block_meta.total_key_count;
585 if self
586 .executor
587 .builder
588 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
589 .await?
590 {
591 skip_raw_block_count += 1;
592 skip_raw_block_size += block_len;
593 }
594 self.executor.may_report_process_key(block_key_count);
595 self.executor.clear();
596 }
597 }
598 rest_data.next_sstable().await?;
599 }
600 let mut total_read_bytes = 0;
601 for sst in &self.left.sstables {
602 total_read_bytes += sst.sst_size;
603 }
604 for sst in &self.right.sstables {
605 total_read_bytes += sst.sst_size;
606 }
607 self.metrics
608 .compact_fast_runner_bytes
609 .inc_by(skip_raw_block_size);
610 tracing::info!(
611 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
612 skip_raw_block_count,
613 self.task_id,
614 skip_raw_block_size * 100 / total_read_bytes,
615 );
616
617 let statistic = self.executor.take_statistics();
618 let output_ssts = self.executor.builder.finish().await?;
619 Compactor::report_progress(
620 self.metrics.clone(),
621 Some(self.executor.task_progress.clone()),
622 &output_ssts,
623 false,
624 );
625 let sst_infos = output_ssts
626 .iter()
627 .map(|sst| sst.sst_info.clone())
628 .collect_vec();
629 assert!(can_concat(&sst_infos));
630 Ok((output_ssts, statistic))
631 }
632}
633
634pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
635 last_key: FullKey<Vec<u8>>,
636 compaction_statistics: CompactionStatistics,
637 last_table_id: Option<u32>,
638 last_table_stats: TableStats,
639 builder: CapacitySplitTableBuilder<F>,
640 task_config: TaskConfig,
641 task_progress: Arc<TaskProgress>,
642 skip_watermark_state: PkPrefixSkipWatermarkState,
643 last_key_is_delete: bool,
644 progress_key_num: u32,
645 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
646 compaction_filter: C,
647}
648
649impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
650 pub fn new(
651 builder: CapacitySplitTableBuilder<F>,
652 task_config: TaskConfig,
653 task_progress: Arc<TaskProgress>,
654 skip_watermark_state: PkPrefixSkipWatermarkState,
655 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
656 compaction_filter: C,
657 ) -> Self {
658 Self {
659 builder,
660 task_config,
661 last_key: FullKey::default(),
662 last_key_is_delete: false,
663 compaction_statistics: CompactionStatistics::default(),
664 last_table_id: None,
665 last_table_stats: TableStats::default(),
666 task_progress,
667 skip_watermark_state,
668 progress_key_num: 0,
669 non_pk_prefix_skip_watermark_state,
670 compaction_filter,
671 }
672 }
673
674 fn take_statistics(&mut self) -> CompactionStatistics {
675 if let Some(last_table_id) = self.last_table_id.take() {
676 self.compaction_statistics
677 .delta_drop_stat
678 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
679 }
680 std::mem::take(&mut self.compaction_statistics)
681 }
682
683 fn clear(&mut self) {
684 if !self.last_key.is_empty() {
685 self.last_key = FullKey::default();
686 }
687 self.last_key_is_delete = false;
688 }
689
690 #[inline(always)]
691 fn may_report_process_key(&mut self, key_count: u32) {
692 const PROGRESS_KEY_INTERVAL: u32 = 100;
693 self.progress_key_num += key_count;
694 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
695 self.task_progress
696 .inc_progress_key(self.progress_key_num as u64);
697 self.progress_key_num = 0;
698 }
699 }
700
701 pub async fn run(
702 &mut self,
703 iter: &mut BlockIterator,
704 target_key: FullKey<&[u8]>,
705 ) -> HummockResult<()> {
706 self.skip_watermark_state.reset_watermark();
707 self.non_pk_prefix_skip_watermark_state.reset_watermark();
708
709 while iter.is_valid() && iter.key().le(&target_key) {
710 let is_new_user_key =
711 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
712 self.compaction_statistics.iter_total_key_counts += 1;
713 self.may_report_process_key(1);
714
715 let mut drop = false;
716 let value = HummockValue::from_slice(iter.value()).unwrap();
717 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
718 if is_first_or_new_user_key {
719 self.last_key.set(iter.key());
720 self.last_key_is_delete = false;
721 }
722
723 if !self.task_config.retain_multiple_version
725 && self.task_config.gc_delete_keys
726 && value.is_delete()
727 {
728 drop = true;
729 self.last_key_is_delete = true;
730 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
731 drop = true;
732 }
733
734 if !drop && self.compaction_filter.should_delete(iter.key()) {
735 drop = true;
736 }
737
738 if !drop && self.watermark_should_delete(&iter.key()) {
739 drop = true;
740 self.last_key_is_delete = true;
741 }
742
743 if self.last_table_id != Some(self.last_key.user_key.table_id.table_id) {
744 if let Some(last_table_id) = self.last_table_id.take() {
745 self.compaction_statistics
746 .delta_drop_stat
747 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
748 }
749 self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
750 }
751
752 if drop {
753 self.compaction_statistics.iter_drop_key_counts += 1;
754
755 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
756 Some(target_table_ids) => {
757 target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
758 }
759 None => true,
760 };
761 if should_count {
762 self.last_table_stats.total_key_count -= 1;
763 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
764 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
765 }
766 iter.next();
767 continue;
768 }
769 self.builder
770 .add_full_key(iter.key(), value, is_new_user_key)
771 .await?;
772 iter.next();
773 }
774 Ok(())
775 }
776
777 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
778 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
779 return false;
782 }
783
784 if self.watermark_should_delete(smallest_key) {
785 return false;
786 }
787
788 if self.compaction_filter.should_delete(*smallest_key) {
790 return false;
791 }
792
793 true
794 }
795
796 fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
797 (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
798 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
799 && self.non_pk_prefix_skip_watermark_state.should_delete(key))
800 }
801}