1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStats;
31use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
32
33use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
34use crate::hummock::block_stream::BlockDataStream;
35use crate::hummock::compactor::task_progress::TaskProgress;
36use crate::hummock::compactor::{
37 CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig,
38};
39use crate::hummock::iterator::{
40 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
41};
42use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
43use crate::hummock::sstable_store::SstableStoreRef;
44use crate::hummock::value::HummockValue;
45use crate::hummock::{
46 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
47 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
48 TableHolder, UnifiedSstableWriterFactory,
49};
50use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
51
52pub struct BlockStreamIterator {
54 block_stream: Option<BlockDataStream>,
56
57 next_block_index: usize,
58
59 sstable: TableHolder,
61 iter: Option<BlockIterator>,
62 task_progress: Arc<TaskProgress>,
63
64 sstable_store: SstableStoreRef,
66 sstable_info: SstableInfo,
67 io_retry_times: usize,
68 max_io_retry_times: usize,
69 stats_ptr: Arc<AtomicU64>,
70}
71
72impl BlockStreamIterator {
73 pub fn new(
88 sstable: TableHolder,
89 task_progress: Arc<TaskProgress>,
90 sstable_store: SstableStoreRef,
91 sstable_info: SstableInfo,
92 max_io_retry_times: usize,
93 stats_ptr: Arc<AtomicU64>,
94 ) -> Self {
95 Self {
96 block_stream: None,
97 next_block_index: 0,
98 sstable,
99 iter: None,
100 task_progress,
101 sstable_store,
102 sstable_info,
103 io_retry_times: 0,
104 max_io_retry_times,
105 stats_ptr,
106 }
107 }
108
109 async fn create_stream(&mut self) -> HummockResult<()> {
110 let block_stream = self
113 .sstable_store
114 .get_stream_for_blocks(
115 self.sstable_info.object_id,
116 &self.sstable.meta.block_metas[self.next_block_index..],
117 )
118 .instrument_await("stream_iter_get_stream".verbose())
119 .await?;
120 self.block_stream = Some(block_stream);
121 Ok(())
122 }
123
124 pub(crate) async fn download_next_block(
126 &mut self,
127 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
128 let now = Instant::now();
129 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
130 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
131 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
132 });
133 loop {
134 let ret = match &mut self.block_stream {
135 Some(block_stream) => block_stream.next_block_impl().await,
136 None => {
137 self.create_stream().await?;
138 continue;
139 }
140 };
141 match ret {
142 Ok(Some((data, _))) => {
143 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
144 let filter_block = self
145 .sstable
146 .filter_reader
147 .get_block_raw_filter(self.next_block_index);
148 self.next_block_index += 1;
149 return Ok(Some((data, filter_block, meta)));
150 }
151
152 Ok(None) => break,
153
154 Err(e) => {
155 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
156 return Err(e);
157 }
158
159 self.block_stream.take();
160 self.io_retry_times += 1;
161 fail_point!("create_stream_err");
162
163 tracing::warn!(
164 "fast compact retry create stream for sstable {} times, sstinfo={}",
165 self.io_retry_times,
166 format!(
167 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
168 self.sstable_info.object_id,
169 self.sstable_info.sst_id,
170 self.sstable_info.meta_offset,
171 self.sstable_info.table_ids
172 )
173 );
174 }
175 }
176 }
177
178 self.next_block_index = self.sstable.meta.block_metas.len();
179 self.iter.take();
180 Ok(None)
181 }
182
183 pub(crate) fn init_block_iter(
184 &mut self,
185 buf: Bytes,
186 uncompressed_capacity: usize,
187 ) -> HummockResult<()> {
188 let block = Block::decode(buf, uncompressed_capacity)?;
189 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
190 iter.seek_to_first();
191 self.iter = Some(iter);
192 Ok(())
193 }
194
195 fn next_block_smallest(&self) -> &[u8] {
196 self.sstable.meta.block_metas[self.next_block_index]
197 .smallest_key
198 .as_ref()
199 }
200
201 fn next_block_largest(&self) -> &[u8] {
202 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
203 self.sstable.meta.block_metas[self.next_block_index + 1]
204 .smallest_key
205 .as_ref()
206 } else {
207 self.sstable.meta.largest_key.as_ref()
208 }
209 }
210
211 fn current_block_largest(&self) -> Vec<u8> {
212 if self.next_block_index < self.sstable.meta.block_metas.len() {
213 let mut largest_key = FullKey::decode(
214 self.sstable.meta.block_metas[self.next_block_index]
215 .smallest_key
216 .as_ref(),
217 );
218 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
220 largest_key.encode()
221 } else {
222 self.sstable.meta.largest_key.clone()
223 }
224 }
225
226 fn key(&self) -> FullKey<&[u8]> {
227 match self.iter.as_ref() {
228 Some(iter) => iter.key(),
229 None => FullKey::decode(
230 self.sstable.meta.block_metas[self.next_block_index]
231 .smallest_key
232 .as_ref(),
233 ),
234 }
235 }
236
237 pub(crate) fn is_valid(&self) -> bool {
238 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
239 }
240
241 #[cfg(test)]
242 #[cfg(feature = "failpoints")]
243 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
244 self.iter.as_mut().unwrap()
245 }
246}
247
248impl Drop for BlockStreamIterator {
249 fn drop(&mut self) {
250 self.task_progress.dec_num_pending_read_io();
251 }
252}
253
254pub struct ConcatSstableIterator {
257 sstable_iter: Option<BlockStreamIterator>,
259
260 cur_idx: usize,
262
263 sstables: Vec<SstableInfo>,
265
266 sstable_store: SstableStoreRef,
267
268 stats: StoreLocalStatistic,
269 task_progress: Arc<TaskProgress>,
270
271 max_io_retry_times: usize,
272}
273
274impl ConcatSstableIterator {
275 pub fn new(
279 sst_infos: Vec<SstableInfo>,
280 sstable_store: SstableStoreRef,
281 task_progress: Arc<TaskProgress>,
282 max_io_retry_times: usize,
283 ) -> Self {
284 Self {
285 sstable_iter: None,
286 cur_idx: 0,
287 sstables: sst_infos,
288 sstable_store,
289 task_progress,
290 stats: StoreLocalStatistic::default(),
291 max_io_retry_times,
292 }
293 }
294
295 pub async fn rewind(&mut self) -> HummockResult<()> {
296 self.seek_idx(0).await
297 }
298
299 pub async fn next_sstable(&mut self) -> HummockResult<()> {
300 self.seek_idx(self.cur_idx + 1).await
301 }
302
303 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
304 self.sstable_iter.as_mut().unwrap()
305 }
306
307 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
308 if let Some(sstable) = self.sstable_iter.as_mut() {
309 if sstable.iter.is_some() {
310 return Ok(());
311 }
312 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
313 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
314 }
315 Ok(())
316 }
317
318 pub fn is_valid(&self) -> bool {
319 self.cur_idx < self.sstables.len()
320 }
321
322 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
324 self.sstable_iter.take();
325 self.cur_idx = idx;
326 if self.cur_idx < self.sstables.len() {
327 let sstable_info = &self.sstables[self.cur_idx];
328 let sstable = self
329 .sstable_store
330 .sstable(sstable_info, &mut self.stats)
331 .instrument_await("stream_iter_sstable".verbose())
332 .await?;
333 self.task_progress.inc_num_pending_read_io();
334
335 let sstable_iter = BlockStreamIterator::new(
336 sstable,
337 self.task_progress.clone(),
338 self.sstable_store.clone(),
339 sstable_info.clone(),
340 self.max_io_retry_times,
341 self.stats.remote_io_time.clone(),
342 );
343 self.sstable_iter = Some(sstable_iter);
344 }
345 Ok(())
346 }
347}
348
349pub struct CompactorRunner {
350 left: Box<ConcatSstableIterator>,
351 right: Box<ConcatSstableIterator>,
352 task_id: u64,
353 executor: CompactTaskExecutor<
354 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
355 >,
356 compression_algorithm: CompressionAlgorithm,
357 metrics: Arc<CompactorMetrics>,
358}
359
360impl CompactorRunner {
361 pub fn new(
362 context: CompactorContext,
363 task: CompactTask,
364 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
365 object_id_getter: Box<dyn GetObjectId>,
366 task_progress: Arc<TaskProgress>,
367 ) -> Self {
368 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
369 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
370 options.compression_algorithm = compression_algorithm;
371 options.capacity = task.target_file_size as usize;
372 let get_id_time = Arc::new(AtomicU64::new(0));
373
374 let key_range = KeyRange::inf();
375
376 let task_config = TaskConfig {
377 key_range,
378 cache_policy: CachePolicy::NotFill,
379 gc_delete_keys: task.gc_delete_keys,
380 retain_multiple_version: false,
381 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
382 task_type: task.task_type,
383 table_vnode_partition: task.table_vnode_partition.clone(),
384 use_block_based_filter: true,
385 table_schemas: Default::default(),
386 disable_drop_column_optimization: false,
387 };
388 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
389
390 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
391 object_id_getter,
392 limiter: context.memory_limiter.clone(),
393 options,
394 policy: task_config.cache_policy,
395 remote_rpc_cost: get_id_time,
396 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
397 sstable_writer_factory: factory,
398 _phantom: PhantomData,
399 };
400 let sst_builder = CapacitySplitTableBuilder::new(
401 builder_factory,
402 context.compactor_metrics.clone(),
403 Some(task_progress.clone()),
404 task_config.table_vnode_partition.clone(),
405 context
406 .storage_opts
407 .compactor_concurrent_uploading_sst_count,
408 compaction_catalog_agent_ref.clone(),
409 );
410 assert_eq!(
411 task.input_ssts.len(),
412 2,
413 "TaskId {} target_level {:?} task {:?}",
414 task.task_id,
415 task.target_level,
416 compact_task_to_string(&task)
417 );
418 let left = Box::new(ConcatSstableIterator::new(
419 task.input_ssts[0].table_infos.clone(),
420 context.sstable_store.clone(),
421 task_progress.clone(),
422 context.storage_opts.compactor_iter_max_io_retry_times,
423 ));
424 let right = Box::new(ConcatSstableIterator::new(
425 task.input_ssts[1].table_infos.clone(),
426 context.sstable_store,
427 task_progress.clone(),
428 context.storage_opts.compactor_iter_max_io_retry_times,
429 ));
430
431 let state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
433 task.pk_prefix_table_watermarks.clone(),
434 );
435 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436 task.non_pk_prefix_table_watermarks.clone(),
437 compaction_catalog_agent_ref,
438 );
439
440 Self {
441 executor: CompactTaskExecutor::new(
442 sst_builder,
443 task_config,
444 task_progress,
445 state,
446 non_pk_prefix_state,
447 ),
448 left,
449 right,
450 task_id: task.task_id,
451 metrics: context.compactor_metrics.clone(),
452 compression_algorithm,
453 }
454 }
455
456 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
457 self.left.rewind().await?;
458 self.right.rewind().await?;
459 let mut skip_raw_block_count = 0;
460 let mut skip_raw_block_size = 0;
461 while self.left.is_valid() && self.right.is_valid() {
462 let ret = self
463 .left
464 .current_sstable()
465 .key()
466 .cmp(&self.right.current_sstable().key());
467 let (first, second) = if ret == Ordering::Less {
468 (&mut self.left, &mut self.right)
469 } else {
470 (&mut self.right, &mut self.left)
471 };
472 assert!(
473 ret != Ordering::Equal,
474 "sst range overlap equal_key {:?}",
475 self.left.current_sstable().key()
476 );
477 if first.current_sstable().iter.is_none() {
478 let right_key = second.current_sstable().key();
479 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
480 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
481 if full_key.user_key.ge(&right_key.user_key) {
484 break;
485 }
486 let smallest_key =
487 FullKey::decode(first.current_sstable().next_block_smallest());
488 if !self.executor.shall_copy_raw_block(&smallest_key) {
489 break;
490 }
491 let smallest_key = smallest_key.to_vec();
492
493 let (mut block, filter_data, mut meta) = first
494 .current_sstable()
495 .download_next_block()
496 .await?
497 .unwrap();
498 let algorithm = Block::get_algorithm(&block)?;
499 if algorithm == CompressionAlgorithm::None
500 && algorithm != self.compression_algorithm
501 {
502 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
503 meta.len = block.len() as u32;
504 }
505
506 let largest_key = first.current_sstable().current_block_largest();
507 let block_len = block.len() as u64;
508 let block_key_count = meta.total_key_count;
509
510 if self
511 .executor
512 .builder
513 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
514 .await?
515 {
516 skip_raw_block_size += block_len;
517 skip_raw_block_count += 1;
518 }
519 self.executor.may_report_process_key(block_key_count);
520 self.executor.clear();
521 }
522 if !first.current_sstable().is_valid() {
523 first.next_sstable().await?;
524 continue;
525 }
526 first.init_block_iter().await?;
527 }
528
529 let target_key = second.current_sstable().key();
530 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
531 self.executor.run(iter, target_key).await?;
532 if !iter.is_valid() {
533 first.sstable_iter.as_mut().unwrap().iter.take();
534 if !first.current_sstable().is_valid() {
535 first.next_sstable().await?;
536 }
537 }
538 }
539 let rest_data = if !self.left.is_valid() {
540 &mut self.right
541 } else {
542 &mut self.left
543 };
544 if rest_data.is_valid() {
545 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
547 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
548 if let Some(iter) = sstable_iter.iter.as_mut() {
549 self.executor.run(iter, target_key).await?;
550 assert!(
551 !iter.is_valid(),
552 "iter should not be valid key {:?}",
553 iter.key()
554 );
555 }
556 sstable_iter.iter.take();
557 }
558
559 while rest_data.is_valid() {
560 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
561 while sstable_iter.is_valid() {
562 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
563 let (block, filter_data, block_meta) =
564 sstable_iter.download_next_block().await?.unwrap();
565 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
567 && self.executor.last_key_is_delete;
568 if self.executor.builder.need_flush()
569 || need_deleted
570 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
571 {
572 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
573 let target_key = FullKey::decode(&largest_key);
574 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
575 let mut iter = sstable_iter.iter.take().unwrap();
576 self.executor.run(&mut iter, target_key).await?;
577 } else {
578 let largest_key = sstable_iter.current_block_largest();
579 let block_len = block.len() as u64;
580 let block_key_count = block_meta.total_key_count;
581 if self
582 .executor
583 .builder
584 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
585 .await?
586 {
587 skip_raw_block_count += 1;
588 skip_raw_block_size += block_len;
589 }
590 self.executor.may_report_process_key(block_key_count);
591 self.executor.clear();
592 }
593 }
594 rest_data.next_sstable().await?;
595 }
596 let mut total_read_bytes = 0;
597 for sst in &self.left.sstables {
598 total_read_bytes += sst.sst_size;
599 }
600 for sst in &self.right.sstables {
601 total_read_bytes += sst.sst_size;
602 }
603 self.metrics
604 .compact_fast_runner_bytes
605 .inc_by(skip_raw_block_size);
606 tracing::info!(
607 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
608 skip_raw_block_count,
609 self.task_id,
610 skip_raw_block_size * 100 / total_read_bytes,
611 );
612
613 let statistic = self.executor.take_statistics();
614 let output_ssts = self.executor.builder.finish().await?;
615 Compactor::report_progress(
616 self.metrics.clone(),
617 Some(self.executor.task_progress.clone()),
618 &output_ssts,
619 false,
620 );
621 let sst_infos = output_ssts
622 .iter()
623 .map(|sst| sst.sst_info.clone())
624 .collect_vec();
625 assert!(can_concat(&sst_infos));
626 Ok((output_ssts, statistic))
627 }
628}
629
630pub struct CompactTaskExecutor<F: TableBuilderFactory> {
631 last_key: FullKey<Vec<u8>>,
632 compaction_statistics: CompactionStatistics,
633 last_table_id: Option<u32>,
634 last_table_stats: TableStats,
635 builder: CapacitySplitTableBuilder<F>,
636 task_config: TaskConfig,
637 task_progress: Arc<TaskProgress>,
638 skip_watermark_state: PkPrefixSkipWatermarkState,
639 last_key_is_delete: bool,
640 progress_key_num: u32,
641 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
642}
643
644impl<F: TableBuilderFactory> CompactTaskExecutor<F> {
645 pub fn new(
646 builder: CapacitySplitTableBuilder<F>,
647 task_config: TaskConfig,
648 task_progress: Arc<TaskProgress>,
649 skip_watermark_state: PkPrefixSkipWatermarkState,
650 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
651 ) -> Self {
652 Self {
653 builder,
654 task_config,
655 last_key: FullKey::default(),
656 last_key_is_delete: false,
657 compaction_statistics: CompactionStatistics::default(),
658 last_table_id: None,
659 last_table_stats: TableStats::default(),
660 task_progress,
661 skip_watermark_state,
662 progress_key_num: 0,
663 non_pk_prefix_skip_watermark_state,
664 }
665 }
666
667 fn take_statistics(&mut self) -> CompactionStatistics {
668 if let Some(last_table_id) = self.last_table_id.take() {
669 self.compaction_statistics
670 .delta_drop_stat
671 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
672 }
673 std::mem::take(&mut self.compaction_statistics)
674 }
675
676 fn clear(&mut self) {
677 if !self.last_key.is_empty() {
678 self.last_key = FullKey::default();
679 }
680 self.last_key_is_delete = false;
681 }
682
683 #[inline(always)]
684 fn may_report_process_key(&mut self, key_count: u32) {
685 const PROGRESS_KEY_INTERVAL: u32 = 100;
686 self.progress_key_num += key_count;
687 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
688 self.task_progress
689 .inc_progress_key(self.progress_key_num as u64);
690 self.progress_key_num = 0;
691 }
692 }
693
694 pub async fn run(
695 &mut self,
696 iter: &mut BlockIterator,
697 target_key: FullKey<&[u8]>,
698 ) -> HummockResult<()> {
699 self.skip_watermark_state.reset_watermark();
700 self.non_pk_prefix_skip_watermark_state.reset_watermark();
701
702 while iter.is_valid() && iter.key().le(&target_key) {
703 let is_new_user_key =
704 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
705 self.compaction_statistics.iter_total_key_counts += 1;
706 self.may_report_process_key(1);
707
708 let mut drop = false;
709 let value = HummockValue::from_slice(iter.value()).unwrap();
710 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
711 if is_first_or_new_user_key {
712 self.last_key.set(iter.key());
713 self.last_key_is_delete = false;
714 }
715
716 if !self.task_config.retain_multiple_version
718 && self.task_config.gc_delete_keys
719 && value.is_delete()
720 {
721 drop = true;
722 self.last_key_is_delete = true;
723 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
724 drop = true;
725 }
726
727 if self.watermark_should_delete(&iter.key()) {
728 drop = true;
729 self.last_key_is_delete = true;
730 }
731
732 if self.last_table_id != Some(self.last_key.user_key.table_id.table_id) {
733 if let Some(last_table_id) = self.last_table_id.take() {
734 self.compaction_statistics
735 .delta_drop_stat
736 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
737 }
738 self.last_table_id = Some(self.last_key.user_key.table_id.table_id);
739 }
740
741 if drop {
742 self.compaction_statistics.iter_drop_key_counts += 1;
743
744 let should_count = match self.task_config.stats_target_table_ids.as_ref() {
745 Some(target_table_ids) => {
746 target_table_ids.contains(&self.last_key.user_key.table_id.table_id)
747 }
748 None => true,
749 };
750 if should_count {
751 self.last_table_stats.total_key_count -= 1;
752 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
753 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
754 }
755 iter.next();
756 continue;
757 }
758 self.builder
759 .add_full_key(iter.key(), value, is_new_user_key)
760 .await?;
761 iter.next();
762 }
763 Ok(())
764 }
765
766 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
767 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
768 return false;
771 }
772
773 if self.watermark_should_delete(smallest_key) {
774 return false;
775 }
776
777 true
778 }
779
780 fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
781 (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key))
782 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
783 && self.non_pk_prefix_skip_watermark_state.should_delete(key))
784 }
785}