1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33
34use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
35use crate::hummock::block_stream::BlockDataStream;
36use crate::hummock::compactor::task_progress::TaskProgress;
37use crate::hummock::compactor::{
38 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
39 RemoteBuilderFactory, TaskConfig,
40};
41use crate::hummock::iterator::{
42 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
43};
44use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
45use crate::hummock::sstable_store::SstableStoreRef;
46use crate::hummock::value::HummockValue;
47use crate::hummock::{
48 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
49 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
50 TableHolder, UnifiedSstableWriterFactory,
51};
52use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
53
54pub struct BlockStreamIterator {
56 block_stream: Option<BlockDataStream>,
58
59 next_block_index: usize,
60
61 sstable: TableHolder,
63 iter: Option<BlockIterator>,
64 task_progress: Arc<TaskProgress>,
65
66 sstable_store: SstableStoreRef,
68 sstable_info: SstableInfo,
69 io_retry_times: usize,
70 max_io_retry_times: usize,
71 stats_ptr: Arc<AtomicU64>,
72}
73
74impl BlockStreamIterator {
75 pub fn new(
90 sstable: TableHolder,
91 task_progress: Arc<TaskProgress>,
92 sstable_store: SstableStoreRef,
93 sstable_info: SstableInfo,
94 max_io_retry_times: usize,
95 stats_ptr: Arc<AtomicU64>,
96 ) -> Self {
97 Self {
98 block_stream: None,
99 next_block_index: 0,
100 sstable,
101 iter: None,
102 task_progress,
103 sstable_store,
104 sstable_info,
105 io_retry_times: 0,
106 max_io_retry_times,
107 stats_ptr,
108 }
109 }
110
111 async fn create_stream(&mut self) -> HummockResult<()> {
112 let block_stream = self
115 .sstable_store
116 .get_stream_for_blocks(
117 self.sstable_info.object_id,
118 &self.sstable.meta.block_metas[self.next_block_index..],
119 )
120 .instrument_await("stream_iter_get_stream".verbose())
121 .await?;
122 self.block_stream = Some(block_stream);
123 Ok(())
124 }
125
126 pub(crate) async fn download_next_block(
128 &mut self,
129 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
130 let now = Instant::now();
131 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
132 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
133 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
134 });
135 loop {
136 let ret = match &mut self.block_stream {
137 Some(block_stream) => block_stream.next_block_impl().await,
138 None => {
139 self.create_stream().await?;
140 continue;
141 }
142 };
143 match ret {
144 Ok(Some((data, _))) => {
145 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
146 let filter_block = self
147 .sstable
148 .filter_reader
149 .get_block_raw_filter(self.next_block_index);
150 self.next_block_index += 1;
151 return Ok(Some((data, filter_block, meta)));
152 }
153
154 Ok(None) => break,
155
156 Err(e) => {
157 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
158 return Err(e);
159 }
160
161 self.block_stream.take();
162 self.io_retry_times += 1;
163 fail_point!("create_stream_err");
164
165 tracing::warn!(
166 "fast compact retry create stream for sstable {} times, sstinfo={}",
167 self.io_retry_times,
168 format!(
169 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
170 self.sstable_info.object_id,
171 self.sstable_info.sst_id,
172 self.sstable_info.meta_offset,
173 self.sstable_info.table_ids
174 )
175 );
176 }
177 }
178 }
179
180 self.next_block_index = self.sstable.meta.block_metas.len();
181 self.iter.take();
182 Ok(None)
183 }
184
185 pub(crate) fn init_block_iter(
186 &mut self,
187 buf: Bytes,
188 uncompressed_capacity: usize,
189 ) -> HummockResult<()> {
190 let block = Block::decode(buf, uncompressed_capacity)?;
191 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
192 iter.seek_to_first();
193 self.iter = Some(iter);
194 Ok(())
195 }
196
197 fn next_block_smallest(&self) -> &[u8] {
198 self.sstable.meta.block_metas[self.next_block_index]
199 .smallest_key
200 .as_ref()
201 }
202
203 fn next_block_largest(&self) -> &[u8] {
204 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
205 self.sstable.meta.block_metas[self.next_block_index + 1]
206 .smallest_key
207 .as_ref()
208 } else {
209 self.sstable.meta.largest_key.as_ref()
210 }
211 }
212
213 fn current_block_largest(&self) -> Vec<u8> {
214 if self.next_block_index < self.sstable.meta.block_metas.len() {
215 let mut largest_key = FullKey::decode(
216 self.sstable.meta.block_metas[self.next_block_index]
217 .smallest_key
218 .as_ref(),
219 );
220 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
222 largest_key.encode()
223 } else {
224 self.sstable.meta.largest_key.clone()
225 }
226 }
227
228 fn key(&self) -> FullKey<&[u8]> {
229 match self.iter.as_ref() {
230 Some(iter) => iter.key(),
231 None => FullKey::decode(
232 self.sstable.meta.block_metas[self.next_block_index]
233 .smallest_key
234 .as_ref(),
235 ),
236 }
237 }
238
239 pub(crate) fn is_valid(&self) -> bool {
240 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
241 }
242
243 #[cfg(test)]
244 #[cfg(feature = "failpoints")]
245 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
246 self.iter.as_mut().unwrap()
247 }
248}
249
250impl Drop for BlockStreamIterator {
251 fn drop(&mut self) {
252 self.task_progress.dec_num_pending_read_io();
253 }
254}
255
256pub struct ConcatSstableIterator {
259 sstable_iter: Option<BlockStreamIterator>,
261
262 cur_idx: usize,
264
265 sstables: Vec<SstableInfo>,
267
268 sstable_store: SstableStoreRef,
269
270 stats: StoreLocalStatistic,
271 task_progress: Arc<TaskProgress>,
272
273 max_io_retry_times: usize,
274}
275
276impl ConcatSstableIterator {
277 pub fn new(
281 sst_infos: Vec<SstableInfo>,
282 sstable_store: SstableStoreRef,
283 task_progress: Arc<TaskProgress>,
284 max_io_retry_times: usize,
285 ) -> Self {
286 Self {
287 sstable_iter: None,
288 cur_idx: 0,
289 sstables: sst_infos,
290 sstable_store,
291 task_progress,
292 stats: StoreLocalStatistic::default(),
293 max_io_retry_times,
294 }
295 }
296
297 pub async fn rewind(&mut self) -> HummockResult<()> {
298 self.seek_idx(0).await
299 }
300
301 pub async fn next_sstable(&mut self) -> HummockResult<()> {
302 self.seek_idx(self.cur_idx + 1).await
303 }
304
305 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
306 self.sstable_iter.as_mut().unwrap()
307 }
308
309 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
310 if let Some(sstable) = self.sstable_iter.as_mut() {
311 if sstable.iter.is_some() {
312 return Ok(());
313 }
314 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
315 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
316 }
317 Ok(())
318 }
319
320 pub fn is_valid(&self) -> bool {
321 self.cur_idx < self.sstables.len()
322 }
323
324 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
326 self.sstable_iter.take();
327 self.cur_idx = idx;
328 if self.cur_idx < self.sstables.len() {
329 let sstable_info = &self.sstables[self.cur_idx];
330 let sstable = self
331 .sstable_store
332 .sstable(sstable_info, &mut self.stats)
333 .instrument_await("stream_iter_sstable".verbose())
334 .await?;
335 self.task_progress.inc_num_pending_read_io();
336
337 let sstable_iter = BlockStreamIterator::new(
338 sstable,
339 self.task_progress.clone(),
340 self.sstable_store.clone(),
341 sstable_info.clone(),
342 self.max_io_retry_times,
343 self.stats.remote_io_time.clone(),
344 );
345 self.sstable_iter = Some(sstable_iter);
346 }
347 Ok(())
348 }
349}
350
351pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
352 left: Box<ConcatSstableIterator>,
353 right: Box<ConcatSstableIterator>,
354 task_id: u64,
355 executor: CompactTaskExecutor<
356 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
357 C,
358 >,
359 compression_algorithm: CompressionAlgorithm,
360 metrics: Arc<CompactorMetrics>,
361}
362
363impl<C: CompactionFilter> CompactorRunner<C> {
364 pub fn new(
365 context: CompactorContext,
366 task: CompactTask,
367 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
368 object_id_getter: Arc<dyn GetObjectId>,
369 task_progress: Arc<TaskProgress>,
370 compaction_filter: C,
371 ) -> Self {
372 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
373 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
374 options.compression_algorithm = compression_algorithm;
375 options.capacity = task.target_file_size as usize;
376 let get_id_time = Arc::new(AtomicU64::new(0));
377
378 let key_range = KeyRange::inf();
379
380 let task_config = TaskConfig {
381 key_range,
382 cache_policy: CachePolicy::NotFill,
383 gc_delete_keys: task.gc_delete_keys,
384 retain_multiple_version: false,
385 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
386 task_type: task.task_type,
387 table_vnode_partition: task.table_vnode_partition.clone(),
388 use_block_based_filter: true,
389 table_schemas: Default::default(),
390 disable_drop_column_optimization: false,
391 };
392 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
393
394 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
395 object_id_getter,
396 limiter: context.memory_limiter.clone(),
397 options,
398 policy: task_config.cache_policy,
399 remote_rpc_cost: get_id_time,
400 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
401 sstable_writer_factory: factory,
402 _phantom: PhantomData,
403 };
404 let sst_builder = CapacitySplitTableBuilder::new(
405 builder_factory,
406 context.compactor_metrics.clone(),
407 Some(task_progress.clone()),
408 task_config.table_vnode_partition.clone(),
409 context
410 .storage_opts
411 .compactor_concurrent_uploading_sst_count,
412 compaction_catalog_agent_ref.clone(),
413 );
414 assert_eq!(
415 task.input_ssts.len(),
416 2,
417 "TaskId {} target_level {:?} task {:?}",
418 task.task_id,
419 task.target_level,
420 compact_task_to_string(&task)
421 );
422 let left = Box::new(ConcatSstableIterator::new(
423 task.input_ssts[0].table_infos.clone(),
424 context.sstable_store.clone(),
425 task_progress.clone(),
426 context.storage_opts.compactor_iter_max_io_retry_times,
427 ));
428 let right = Box::new(ConcatSstableIterator::new(
429 task.input_ssts[1].table_infos.clone(),
430 context.sstable_store,
431 task_progress.clone(),
432 context.storage_opts.compactor_iter_max_io_retry_times,
433 ));
434
435 let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
437 task.pk_prefix_table_watermarks.clone(),
438 );
439 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
440 task.non_pk_prefix_table_watermarks.clone(),
441 compaction_catalog_agent_ref,
442 );
443
444 Self {
445 executor: CompactTaskExecutor::new(
446 sst_builder,
447 task_config,
448 task_progress,
449 state,
450 non_pk_prefix_state,
451 compaction_filter,
452 ),
453 left,
454 right,
455 task_id: task.task_id,
456 metrics: context.compactor_metrics,
457 compression_algorithm,
458 }
459 }
460
461 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
462 self.left.rewind().await?;
463 self.right.rewind().await?;
464 let mut skip_raw_block_count = 0;
465 let mut skip_raw_block_size = 0;
466 while self.left.is_valid() && self.right.is_valid() {
467 let ret = self
468 .left
469 .current_sstable()
470 .key()
471 .cmp(&self.right.current_sstable().key());
472 let (first, second) = if ret == Ordering::Less {
473 (&mut self.left, &mut self.right)
474 } else {
475 (&mut self.right, &mut self.left)
476 };
477 assert!(
478 ret != Ordering::Equal,
479 "sst range overlap equal_key {:?}",
480 self.left.current_sstable().key()
481 );
482 if first.current_sstable().iter.is_none() {
483 let right_key = second.current_sstable().key();
484 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
485 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
486 if full_key.user_key.ge(&right_key.user_key) {
489 break;
490 }
491 let smallest_key =
492 FullKey::decode(first.current_sstable().next_block_smallest());
493 if !self.executor.shall_copy_raw_block(&smallest_key) {
494 break;
495 }
496 let smallest_key = smallest_key.to_vec();
497
498 let (mut block, filter_data, mut meta) = first
499 .current_sstable()
500 .download_next_block()
501 .await?
502 .unwrap();
503 let algorithm = Block::get_algorithm(&block)?;
504 if algorithm == CompressionAlgorithm::None
505 && algorithm != self.compression_algorithm
506 {
507 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
508 meta.len = block.len() as u32;
509 }
510
511 let largest_key = first.current_sstable().current_block_largest();
512 let block_len = block.len() as u64;
513 let block_key_count = meta.total_key_count;
514
515 if self
516 .executor
517 .builder
518 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
519 .await?
520 {
521 skip_raw_block_size += block_len;
522 skip_raw_block_count += 1;
523 }
524 self.executor.may_report_process_key(block_key_count);
525 self.executor.clear();
526 }
527 if !first.current_sstable().is_valid() {
528 first.next_sstable().await?;
529 continue;
530 }
531 first.init_block_iter().await?;
532 }
533
534 let target_key = second.current_sstable().key();
535 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
536 self.executor.run(iter, target_key).await?;
537 if !iter.is_valid() {
538 first.sstable_iter.as_mut().unwrap().iter.take();
539 if !first.current_sstable().is_valid() {
540 first.next_sstable().await?;
541 }
542 }
543 }
544 let rest_data = if !self.left.is_valid() {
545 &mut self.right
546 } else {
547 &mut self.left
548 };
549 if rest_data.is_valid() {
550 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
552 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
553 if let Some(iter) = sstable_iter.iter.as_mut() {
554 self.executor.run(iter, target_key).await?;
555 assert!(
556 !iter.is_valid(),
557 "iter should not be valid key {:?}",
558 iter.key()
559 );
560 }
561 sstable_iter.iter.take();
562 }
563
564 while rest_data.is_valid() {
565 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
566 while sstable_iter.is_valid() {
567 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
568 let (block, filter_data, block_meta) =
569 sstable_iter.download_next_block().await?.unwrap();
570 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
572 && self.executor.last_key_is_delete;
573 if self.executor.builder.need_flush()
574 || need_deleted
575 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
576 {
577 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
578 let target_key = FullKey::decode(&largest_key);
579 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
580 let mut iter = sstable_iter.iter.take().unwrap();
581 self.executor.run(&mut iter, target_key).await?;
582 } else {
583 let largest_key = sstable_iter.current_block_largest();
584 let block_len = block.len() as u64;
585 let block_key_count = block_meta.total_key_count;
586 if self
587 .executor
588 .builder
589 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
590 .await?
591 {
592 skip_raw_block_count += 1;
593 skip_raw_block_size += block_len;
594 }
595 self.executor.may_report_process_key(block_key_count);
596 self.executor.clear();
597 }
598 }
599 rest_data.next_sstable().await?;
600 }
601 let mut total_read_bytes = 0;
602 for sst in &self.left.sstables {
603 total_read_bytes += sst.sst_size;
604 }
605 for sst in &self.right.sstables {
606 total_read_bytes += sst.sst_size;
607 }
608 self.metrics
609 .compact_fast_runner_bytes
610 .inc_by(skip_raw_block_size);
611 tracing::info!(
612 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
613 skip_raw_block_count,
614 self.task_id,
615 skip_raw_block_size * 100 / total_read_bytes,
616 );
617
618 let statistic = self.executor.take_statistics();
619 let output_ssts = self.executor.builder.finish().await?;
620 Compactor::report_progress(
621 self.metrics.clone(),
622 Some(self.executor.task_progress.clone()),
623 &output_ssts,
624 false,
625 );
626 let sst_infos = output_ssts
627 .iter()
628 .map(|sst| sst.sst_info.clone())
629 .collect_vec();
630 assert!(can_concat(&sst_infos));
631 Ok((output_ssts, statistic))
632 }
633}
634
635pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
636 last_key: FullKey<Vec<u8>>,
637 compaction_statistics: CompactionStatistics,
638 last_table_id: Option<TableId>,
639 last_table_stats: TableStats,
640 builder: CapacitySplitTableBuilder<F>,
641 task_config: TaskConfig,
642 task_progress: Arc<TaskProgress>,
643 skip_watermark_state: PkPrefixSkipWatermarkState,
644 last_key_is_delete: bool,
645 progress_key_num: u32,
646 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
647 compaction_filter: C,
648}
649
650impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
651 pub fn new(
652 builder: CapacitySplitTableBuilder<F>,
653 task_config: TaskConfig,
654 task_progress: Arc<TaskProgress>,
655 skip_watermark_state: PkPrefixSkipWatermarkState,
656 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
657 compaction_filter: C,
658 ) -> Self {
659 Self {
660 builder,
661 task_config,
662 last_key: FullKey::default(),
663 last_key_is_delete: false,
664 compaction_statistics: CompactionStatistics::default(),
665 last_table_id: None,
666 last_table_stats: TableStats::default(),
667 task_progress,
668 skip_watermark_state,
669 progress_key_num: 0,
670 non_pk_prefix_skip_watermark_state,
671 compaction_filter,
672 }
673 }
674
675 fn take_statistics(&mut self) -> CompactionStatistics {
676 if let Some(last_table_id) = self.last_table_id.take() {
677 self.compaction_statistics
678 .delta_drop_stat
679 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
680 }
681 std::mem::take(&mut self.compaction_statistics)
682 }
683
684 fn clear(&mut self) {
685 if !self.last_key.is_empty() {
686 self.last_key = FullKey::default();
687 }
688 self.last_key_is_delete = false;
689 }
690
691 #[inline(always)]
692 fn may_report_process_key(&mut self, key_count: u32) {
693 const PROGRESS_KEY_INTERVAL: u32 = 100;
694 self.progress_key_num += key_count;
695 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
696 self.task_progress
697 .inc_progress_key(self.progress_key_num as u64);
698 self.progress_key_num = 0;
699 }
700 }
701
702 pub async fn run(
703 &mut self,
704 iter: &mut BlockIterator,
705 target_key: FullKey<&[u8]>,
706 ) -> HummockResult<()> {
707 self.skip_watermark_state.reset_watermark();
708 self.non_pk_prefix_skip_watermark_state.reset_watermark();
709
710 while iter.is_valid() && iter.key().le(&target_key) {
711 let is_new_user_key =
712 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
713 self.compaction_statistics.iter_total_key_counts += 1;
714 self.may_report_process_key(1);
715
716 let mut drop = false;
717 let value = HummockValue::from_slice(iter.value()).unwrap();
718 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
719 if is_first_or_new_user_key {
720 self.last_key.set(iter.key());
721 self.last_key_is_delete = false;
722 }
723
724 if !self.task_config.retain_multiple_version
726 && self.task_config.gc_delete_keys
727 && value.is_delete()
728 {
729 drop = true;
730 self.last_key_is_delete = true;
731 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
732 drop = true;
733 }
734
735 if !drop && self.compaction_filter.should_delete(iter.key()) {
736 drop = true;
737 }
738
739 if !drop && self.watermark_should_delete(&iter.key()) {
740 drop = true;
741 self.last_key_is_delete = true;
742 }
743
744 if self.last_table_id != Some(self.last_key.user_key.table_id) {
745 if let Some(last_table_id) = self.last_table_id.take() {
746 self.compaction_statistics
747 .delta_drop_stat
748 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
749 }
750 self.last_table_id = Some(self.last_key.user_key.table_id);
751 }
752
753 if drop {
754 self.compaction_statistics.iter_drop_key_counts += 1;
755
756 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
757 Some(target_table_ids) => {
758 target_table_ids.contains(&self.last_key.user_key.table_id)
759 }
760 None => true,
761 };
762 if should_count {
763 self.last_table_stats.total_key_count -= 1;
764 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
765 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
766 }
767 iter.next();
768 continue;
769 }
770 self.builder
771 .add_full_key(iter.key(), value, is_new_user_key)
772 .await?;
773 iter.next();
774 }
775 Ok(())
776 }
777
778 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
779 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
780 return false;
783 }
784
785 if self.watermark_should_delete(smallest_key) {
786 return false;
787 }
788
789 if self.compaction_filter.should_delete(*smallest_key) {
791 return false;
792 }
793
794 true
795 }
796
797 fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
798 (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
799 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
800 && self.non_pk_prefix_skip_watermark_state.should_delete(key))
801 }
802}