1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::BloomFilterType;
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::task_progress::TaskProgress;
38use crate::hummock::compactor::{
39 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
40 RemoteBuilderFactory, TaskConfig,
41};
42use crate::hummock::iterator::{
43 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
44 ValueSkipWatermarkState,
45};
46use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
47use crate::hummock::sstable_store::SstableStoreRef;
48use crate::hummock::value::HummockValue;
49use crate::hummock::{
50 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
51 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
52 TableHolder, UnifiedSstableWriterFactory,
53};
54use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
55
56pub struct BlockStreamIterator {
58 block_stream: Option<BlockDataStream>,
60
61 next_block_index: usize,
62
63 sstable: TableHolder,
65 iter: Option<BlockIterator>,
66 task_progress: Arc<TaskProgress>,
67
68 sstable_store: SstableStoreRef,
70 sstable_info: SstableInfo,
71 io_retry_times: usize,
72 max_io_retry_times: usize,
73 stats_ptr: Arc<AtomicU64>,
74}
75
76impl BlockStreamIterator {
77 pub fn new(
92 sstable: TableHolder,
93 task_progress: Arc<TaskProgress>,
94 sstable_store: SstableStoreRef,
95 sstable_info: SstableInfo,
96 max_io_retry_times: usize,
97 stats_ptr: Arc<AtomicU64>,
98 ) -> Self {
99 Self {
100 block_stream: None,
101 next_block_index: 0,
102 sstable,
103 iter: None,
104 task_progress,
105 sstable_store,
106 sstable_info,
107 io_retry_times: 0,
108 max_io_retry_times,
109 stats_ptr,
110 }
111 }
112
113 async fn create_stream(&mut self) -> HummockResult<()> {
114 let block_stream = self
117 .sstable_store
118 .get_stream_for_blocks(
119 self.sstable_info.object_id,
120 &self.sstable.meta.block_metas[self.next_block_index..],
121 )
122 .instrument_await("stream_iter_get_stream".verbose())
123 .await?;
124 self.block_stream = Some(block_stream);
125 Ok(())
126 }
127
128 pub(crate) async fn download_next_block(
130 &mut self,
131 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
132 let now = Instant::now();
133 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
134 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
135 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
136 });
137 loop {
138 let ret = match &mut self.block_stream {
139 Some(block_stream) => block_stream.next_block_impl().await,
140 None => {
141 self.create_stream().await?;
142 continue;
143 }
144 };
145 match ret {
146 Ok(Some((data, _))) => {
147 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
148 let filter_block = self
149 .sstable
150 .filter_reader
151 .get_block_raw_filter(self.next_block_index);
152 self.next_block_index += 1;
153 return Ok(Some((data, filter_block, meta)));
154 }
155
156 Ok(None) => break,
157
158 Err(e) => {
159 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
160 return Err(e);
161 }
162
163 self.block_stream.take();
164 self.io_retry_times += 1;
165 fail_point!("create_stream_err");
166
167 tracing::warn!(
168 "fast compact retry create stream for sstable {} times, sstinfo={}",
169 self.io_retry_times,
170 format!(
171 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
172 self.sstable_info.object_id,
173 self.sstable_info.sst_id,
174 self.sstable_info.meta_offset,
175 self.sstable_info.table_ids
176 )
177 );
178 }
179 }
180 }
181
182 self.next_block_index = self.sstable.meta.block_metas.len();
183 self.iter.take();
184 Ok(None)
185 }
186
187 pub(crate) fn init_block_iter(
188 &mut self,
189 buf: Bytes,
190 uncompressed_capacity: usize,
191 ) -> HummockResult<()> {
192 let block = Block::decode(buf, uncompressed_capacity)?;
193 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
194 iter.seek_to_first();
195 self.iter = Some(iter);
196 Ok(())
197 }
198
199 fn next_block_smallest(&self) -> &[u8] {
200 self.sstable.meta.block_metas[self.next_block_index]
201 .smallest_key
202 .as_ref()
203 }
204
205 fn next_block_largest(&self) -> &[u8] {
206 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
207 self.sstable.meta.block_metas[self.next_block_index + 1]
208 .smallest_key
209 .as_ref()
210 } else {
211 self.sstable.meta.largest_key.as_ref()
212 }
213 }
214
215 fn current_block_largest(&self) -> Vec<u8> {
216 if self.next_block_index < self.sstable.meta.block_metas.len() {
217 let mut largest_key = FullKey::decode(
218 self.sstable.meta.block_metas[self.next_block_index]
219 .smallest_key
220 .as_ref(),
221 );
222 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
224 largest_key.encode()
225 } else {
226 self.sstable.meta.largest_key.clone()
227 }
228 }
229
230 fn key(&self) -> FullKey<&[u8]> {
231 match self.iter.as_ref() {
232 Some(iter) => iter.key(),
233 None => FullKey::decode(
234 self.sstable.meta.block_metas[self.next_block_index]
235 .smallest_key
236 .as_ref(),
237 ),
238 }
239 }
240
241 pub(crate) fn is_valid(&self) -> bool {
242 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
243 }
244
245 #[cfg(test)]
246 #[cfg(feature = "failpoints")]
247 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
248 self.iter.as_mut().unwrap()
249 }
250}
251
252impl Drop for BlockStreamIterator {
253 fn drop(&mut self) {
254 self.task_progress.dec_num_pending_read_io();
255 }
256}
257
258pub struct ConcatSstableIterator {
261 sstable_iter: Option<BlockStreamIterator>,
263
264 cur_idx: usize,
266
267 sstables: Vec<SstableInfo>,
269
270 sstable_store: SstableStoreRef,
271
272 stats: StoreLocalStatistic,
273 task_progress: Arc<TaskProgress>,
274
275 max_io_retry_times: usize,
276}
277
278impl ConcatSstableIterator {
279 pub fn new(
283 sst_infos: Vec<SstableInfo>,
284 sstable_store: SstableStoreRef,
285 task_progress: Arc<TaskProgress>,
286 max_io_retry_times: usize,
287 ) -> Self {
288 Self {
289 sstable_iter: None,
290 cur_idx: 0,
291 sstables: sst_infos,
292 sstable_store,
293 task_progress,
294 stats: StoreLocalStatistic::default(),
295 max_io_retry_times,
296 }
297 }
298
299 pub async fn rewind(&mut self) -> HummockResult<()> {
300 self.seek_idx(0).await
301 }
302
303 pub async fn next_sstable(&mut self) -> HummockResult<()> {
304 self.seek_idx(self.cur_idx + 1).await
305 }
306
307 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
308 self.sstable_iter.as_mut().unwrap()
309 }
310
311 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
312 if let Some(sstable) = self.sstable_iter.as_mut() {
313 if sstable.iter.is_some() {
314 return Ok(());
315 }
316 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
317 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
318 }
319 Ok(())
320 }
321
322 pub fn is_valid(&self) -> bool {
323 self.cur_idx < self.sstables.len()
324 }
325
326 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
328 self.sstable_iter.take();
329 self.cur_idx = idx;
330 if self.cur_idx < self.sstables.len() {
331 let sstable_info = &self.sstables[self.cur_idx];
332 let sstable = self
333 .sstable_store
334 .sstable(sstable_info, &mut self.stats)
335 .instrument_await("stream_iter_sstable".verbose())
336 .await?;
337 self.task_progress.inc_num_pending_read_io();
338
339 let sstable_iter = BlockStreamIterator::new(
340 sstable,
341 self.task_progress.clone(),
342 self.sstable_store.clone(),
343 sstable_info.clone(),
344 self.max_io_retry_times,
345 self.stats.remote_io_time.clone(),
346 );
347 self.sstable_iter = Some(sstable_iter);
348 }
349 Ok(())
350 }
351}
352
353pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
354 left: Box<ConcatSstableIterator>,
355 right: Box<ConcatSstableIterator>,
356 task_id: u64,
357 executor: CompactTaskExecutor<
358 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
359 C,
360 >,
361 compression_algorithm: CompressionAlgorithm,
362 metrics: Arc<CompactorMetrics>,
363}
364
365impl<C: CompactionFilter> CompactorRunner<C> {
366 pub fn new(
367 context: CompactorContext,
368 task: CompactTask,
369 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
370 object_id_getter: Arc<dyn GetObjectId>,
371 task_progress: Arc<TaskProgress>,
372 compaction_filter: C,
373 ) -> Self {
374 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
375 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
376 options.compression_algorithm = compression_algorithm;
377 options.capacity = task.target_file_size as usize;
378 options.max_vnode_key_range_bytes = None;
380 let get_id_time = Arc::new(AtomicU64::new(0));
381
382 let key_range = KeyRange::inf();
383 let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
384
385 let task_config = TaskConfig {
386 key_range,
387 cache_policy: CachePolicy::NotFill,
388 gc_delete_keys: task.gc_delete_keys,
389 retain_multiple_version: false,
390 table_vnode_partition: task.table_vnode_partition.clone(),
391 use_block_based_filter: true,
392 table_schemas: Default::default(),
393 disable_drop_column_optimization: false,
394 };
395 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
396
397 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
398 object_id_getter,
399 limiter: context.memory_limiter.clone(),
400 options,
401 policy: task_config.cache_policy,
402 remote_rpc_cost: get_id_time,
403 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
404 sstable_writer_factory: factory,
405 _phantom: PhantomData,
406 };
407 let sst_builder = CapacitySplitTableBuilder::new(
408 builder_factory,
409 context.compactor_metrics.clone(),
410 Some(task_progress.clone()),
411 task_config.table_vnode_partition.clone(),
412 context
413 .storage_opts
414 .compactor_concurrent_uploading_sst_count,
415 compaction_catalog_agent_ref.clone(),
416 );
417 assert_eq!(
418 task.input_ssts.len(),
419 2,
420 "TaskId {} target_level {:?} task {:?}",
421 task.task_id,
422 task.target_level,
423 compact_task_to_string(&task)
424 );
425 let left_ssts = task.input_ssts[0]
426 .read_sstable_infos()
427 .cloned()
428 .collect_vec();
429 let right_ssts = task.input_ssts[1]
430 .read_sstable_infos()
431 .cloned()
432 .collect_vec();
433 assert!(
434 left_ssts
435 .iter()
436 .chain(right_ssts.iter())
437 .all(|sst| sst.bloom_filter_kind == BloomFilterType::Blocked),
438 "fast compaction requires blocked-filter SSTs: {}",
439 compact_task_to_string(&task)
440 );
441 let left = Box::new(ConcatSstableIterator::new(
442 left_ssts,
443 context.sstable_store.clone(),
444 task_progress.clone(),
445 context.storage_opts.compactor_iter_max_io_retry_times,
446 ));
447 let right = Box::new(ConcatSstableIterator::new(
448 right_ssts,
449 context.sstable_store,
450 task_progress.clone(),
451 context.storage_opts.compactor_iter_max_io_retry_times,
452 ));
453
454 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
456 task.pk_prefix_table_watermarks.clone(),
457 );
458 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
459 task.non_pk_prefix_table_watermarks.clone(),
460 compaction_catalog_agent_ref.clone(),
461 );
462 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
463 task.value_table_watermarks.clone(),
464 compaction_catalog_agent_ref,
465 );
466
467 Self {
468 executor: CompactTaskExecutor::new(
469 sst_builder,
470 task_config,
471 task_progress,
472 pk_prefix_state,
473 non_pk_prefix_state,
474 value_skip_watermark_state,
475 compaction_filter,
476 read_table_ids,
477 ),
478 left,
479 right,
480 task_id: task.task_id,
481 metrics: context.compactor_metrics,
482 compression_algorithm,
483 }
484 }
485
486 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
487 self.left.rewind().await?;
488 self.right.rewind().await?;
489 let mut skip_raw_block_count = 0;
490 let mut skip_raw_block_size = 0;
491 while self.left.is_valid() && self.right.is_valid() {
492 let ret = self
493 .left
494 .current_sstable()
495 .key()
496 .cmp(&self.right.current_sstable().key());
497 let (first, second) = if ret == Ordering::Less {
498 (&mut self.left, &mut self.right)
499 } else {
500 (&mut self.right, &mut self.left)
501 };
502 assert!(
503 ret != Ordering::Equal,
504 "sst range overlap equal_key {:?}",
505 self.left.current_sstable().key()
506 );
507 if first.current_sstable().iter.is_none() {
508 let right_key = second.current_sstable().key();
509 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
510 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
511 if full_key.user_key.ge(&right_key.user_key) {
514 break;
515 }
516 let smallest_key =
517 FullKey::decode(first.current_sstable().next_block_smallest());
518 if !self.executor.shall_copy_raw_block(&smallest_key) {
519 break;
520 }
521 let smallest_key = smallest_key.to_vec();
522
523 let (mut block, filter_data, mut meta) = first
524 .current_sstable()
525 .download_next_block()
526 .await?
527 .unwrap();
528 let algorithm = Block::get_algorithm(&block)?;
529 if algorithm == CompressionAlgorithm::None
530 && algorithm != self.compression_algorithm
531 {
532 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
533 meta.len = block.len() as u32;
534 }
535
536 let largest_key = first.current_sstable().current_block_largest();
537 let block_len = block.len() as u64;
538 let block_key_count = meta.total_key_count;
539
540 if self
541 .executor
542 .builder
543 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
544 .await?
545 {
546 skip_raw_block_size += block_len;
547 skip_raw_block_count += 1;
548 }
549 self.executor.may_report_process_key(block_key_count);
550 self.executor.clear();
551 }
552 if !first.current_sstable().is_valid() {
553 first.next_sstable().await?;
554 continue;
555 }
556 first.init_block_iter().await?;
557 }
558
559 let target_key = second.current_sstable().key();
560 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
561 self.executor.reset_watermark();
562 self.executor.run(iter, target_key).await?;
563 if !iter.is_valid() {
564 first.sstable_iter.as_mut().unwrap().iter.take();
565 if !first.current_sstable().is_valid() {
566 first.next_sstable().await?;
567 }
568 }
569 }
570 let rest_data = if !self.left.is_valid() {
571 &mut self.right
572 } else {
573 &mut self.left
574 };
575 if rest_data.is_valid() {
576 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
578 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
579 if let Some(iter) = sstable_iter.iter.as_mut() {
580 self.executor.reset_watermark();
581 self.executor.run(iter, target_key).await?;
582 assert!(
583 !iter.is_valid(),
584 "iter should not be valid key {:?}",
585 iter.key()
586 );
587 }
588 sstable_iter.iter.take();
589 }
590
591 while rest_data.is_valid() {
592 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
593 while sstable_iter.is_valid() {
594 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
595 let (block, filter_data, block_meta) =
596 sstable_iter.download_next_block().await?.unwrap();
597 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
599 && self.executor.last_key_is_delete;
600 if self.executor.builder.need_flush()
601 || need_deleted
602 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
603 {
604 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
605 let target_key = FullKey::decode(&largest_key);
606 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
607 let mut iter = sstable_iter.iter.take().unwrap();
608 self.executor.reset_watermark();
609 self.executor.run(&mut iter, target_key).await?;
610 } else {
611 let largest_key = sstable_iter.current_block_largest();
612 let block_len = block.len() as u64;
613 let block_key_count = block_meta.total_key_count;
614 if self
615 .executor
616 .builder
617 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
618 .await?
619 {
620 skip_raw_block_count += 1;
621 skip_raw_block_size += block_len;
622 }
623 self.executor.may_report_process_key(block_key_count);
624 self.executor.clear();
625 }
626 }
627 rest_data.next_sstable().await?;
628 }
629 let mut total_read_bytes = 0;
630 for sst in &self.left.sstables {
631 total_read_bytes += sst.sst_size;
632 }
633 for sst in &self.right.sstables {
634 total_read_bytes += sst.sst_size;
635 }
636 self.metrics
637 .compact_fast_runner_bytes
638 .inc_by(skip_raw_block_size);
639 tracing::info!(
640 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
641 skip_raw_block_count,
642 self.task_id,
643 skip_raw_block_size * 100 / total_read_bytes,
644 );
645
646 let statistic = self.executor.take_statistics();
647 let output_ssts = self.executor.builder.finish().await?;
648 Compactor::report_progress(
649 self.metrics.clone(),
650 Some(self.executor.task_progress.clone()),
651 &output_ssts,
652 false,
653 );
654 let sst_infos = output_ssts
655 .iter()
656 .map(|sst| sst.sst_info.clone())
657 .collect_vec();
658 assert!(can_concat(&sst_infos));
659 Ok((output_ssts, statistic))
660 }
661}
662
663pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
664 last_key: FullKey<Vec<u8>>,
665 compaction_statistics: CompactionStatistics,
666 last_table_id: Option<TableId>,
667 last_table_stats: TableStats,
668 builder: CapacitySplitTableBuilder<F>,
669 task_config: TaskConfig,
670 task_progress: Arc<TaskProgress>,
671 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
672 last_key_is_delete: bool,
673 progress_key_num: u32,
674 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
675 value_skip_watermark_state: ValueSkipWatermarkState,
676 compaction_filter: C,
677 read_table_ids: HashSet<TableId>,
678}
679
680impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
681 pub fn new(
682 builder: CapacitySplitTableBuilder<F>,
683 task_config: TaskConfig,
684 task_progress: Arc<TaskProgress>,
685 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
686 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
687 value_skip_watermark_state: ValueSkipWatermarkState,
688 compaction_filter: C,
689 read_table_ids: HashSet<TableId>,
690 ) -> Self {
691 Self {
692 builder,
693 task_config,
694 last_key: FullKey::default(),
695 last_key_is_delete: false,
696 compaction_statistics: CompactionStatistics::default(),
697 last_table_id: None,
698 last_table_stats: TableStats::default(),
699 task_progress,
700 pk_prefix_skip_watermark_state,
701 progress_key_num: 0,
702 non_pk_prefix_skip_watermark_state,
703 value_skip_watermark_state,
704 compaction_filter,
705 read_table_ids,
706 }
707 }
708
709 fn take_statistics(&mut self) -> CompactionStatistics {
710 if let Some(last_table_id) = self.last_table_id.take() {
711 self.compaction_statistics
712 .delta_drop_stat
713 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
714 }
715 std::mem::take(&mut self.compaction_statistics)
716 }
717
718 fn clear(&mut self) {
719 if !self.last_key.is_empty() {
720 self.last_key = FullKey::default();
721 }
722 self.last_key_is_delete = false;
723 }
724
725 fn reset_watermark(&mut self) {
726 self.pk_prefix_skip_watermark_state.reset_watermark();
727 self.non_pk_prefix_skip_watermark_state.reset_watermark();
728 self.value_skip_watermark_state.reset_watermark();
729 }
730
731 #[inline(always)]
732 fn should_skip_block(&self, table_id: TableId) -> bool {
733 !self.read_table_ids.contains(&table_id)
734 }
735
736 #[inline(always)]
737 fn may_report_process_key(&mut self, key_count: u32) {
738 const PROGRESS_KEY_INTERVAL: u32 = 100;
739 self.progress_key_num += key_count;
740 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
741 self.task_progress
742 .inc_progress_key(self.progress_key_num as u64);
743 self.progress_key_num = 0;
744 }
745 }
746
747 pub async fn run(
748 &mut self,
749 iter: &mut BlockIterator,
750 target_key: FullKey<&[u8]>,
751 ) -> HummockResult<()> {
752 if self.should_skip_block(iter.table_id()) {
753 iter.finish_block();
754 return Ok(());
755 }
756
757 while iter.is_valid() && iter.key().le(&target_key) {
758 let is_new_user_key =
759 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
760 self.compaction_statistics.iter_total_key_counts += 1;
761 self.may_report_process_key(1);
762
763 let mut drop = false;
764 let value = HummockValue::from_slice(iter.value()).unwrap();
765 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
766 if is_first_or_new_user_key {
767 self.last_key.set(iter.key());
768 self.last_key_is_delete = false;
769 }
770
771 if !self.task_config.retain_multiple_version
773 && self.task_config.gc_delete_keys
774 && value.is_delete()
775 {
776 drop = true;
777 self.last_key_is_delete = true;
778 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
779 drop = true;
780 }
781
782 if !drop && self.compaction_filter.should_delete(iter.key()) {
783 drop = true;
784 }
785
786 if !drop && self.watermark_should_delete(&iter.key(), value) {
787 drop = true;
788 self.last_key_is_delete = true;
789 }
790
791 if self.last_table_id != Some(self.last_key.user_key.table_id) {
792 if let Some(last_table_id) = self.last_table_id.take() {
793 self.compaction_statistics
794 .delta_drop_stat
795 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
796 }
797 self.last_table_id = Some(self.last_key.user_key.table_id);
798 }
799
800 if drop {
801 self.compaction_statistics.iter_drop_key_counts += 1;
802
803 self.last_table_stats.total_key_count -= 1;
804 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
805 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
806 iter.next();
807 continue;
808 }
809 self.builder
810 .add_full_key(iter.key(), value, is_new_user_key)
811 .await?;
812 iter.next();
813 }
814 Ok(())
815 }
816
817 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
818 if self.should_skip_block(smallest_key.user_key.table_id) {
819 return false;
821 }
822
823 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
824 return false;
827 }
828
829 if self.watermark_may_delete(smallest_key) {
830 return false;
831 }
832
833 if self.compaction_filter.should_delete(*smallest_key) {
835 return false;
836 }
837
838 true
839 }
840
841 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
842 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
844 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
845 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
846 let unused = vec![];
847 let unused_put = HummockValue::Put(unused.as_slice());
848 if (pk_prefix_has_watermark
849 && self
850 .pk_prefix_skip_watermark_state
851 .should_delete(key, unused_put))
852 || (non_pk_prefix_has_watermark
853 && self
854 .non_pk_prefix_skip_watermark_state
855 .should_delete(key, unused_put))
856 {
857 return true;
858 }
859 }
860 self.value_skip_watermark_state.has_watermark()
861 && self.value_skip_watermark_state.may_delete(key)
862 }
863
864 fn watermark_should_delete(
865 &mut self,
866 key: &FullKey<&[u8]>,
867 value: HummockValue<&[u8]>,
868 ) -> bool {
869 (self.pk_prefix_skip_watermark_state.has_watermark()
870 && self
871 .pk_prefix_skip_watermark_state
872 .should_delete(key, value))
873 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
874 && self
875 .non_pk_prefix_skip_watermark_state
876 .should_delete(key, value))
877 || (self.value_skip_watermark_state.has_watermark()
878 && self.value_skip_watermark_state.should_delete(key, value))
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use std::collections::{HashMap, VecDeque};
885 use std::sync::Arc;
886
887 use risingwave_common::catalog::TableId;
888 use risingwave_common::hash::VirtualNode;
889 use risingwave_common::util::epoch::test_epoch;
890 use risingwave_hummock_sdk::compact_task::CompactTask;
891 use risingwave_hummock_sdk::key::FullKey;
892 use risingwave_hummock_sdk::level::InputLevel;
893 use risingwave_pb::hummock::compact_task::TaskType;
894 use risingwave_pb::hummock::{BloomFilterType, LevelType};
895
896 use super::CompactorRunner;
897 use crate::compaction_catalog_manager::CompactionCatalogAgent;
898 use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
899 use crate::hummock::compactor::task_progress::TaskProgress;
900 use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
901 use crate::hummock::iterator::test_utils::mock_sstable_store;
902 use crate::hummock::test_utils::{
903 default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
904 };
905 use crate::hummock::value::HummockValue;
906 use crate::hummock::{
907 BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
908 };
909 use crate::monitor::CompactorMetrics;
910
911 fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
912 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
913 table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
914 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
915 }
916
917 #[tokio::test]
918 async fn test_fast_compact_skips_empty_table_id_sst() {
919 let sstable_store = mock_sstable_store().await;
920 let table_id_to_vnode = HashMap::from([
921 (1, VirtualNode::COUNT_FOR_TEST),
922 (2, VirtualNode::COUNT_FOR_TEST),
923 ]);
924 let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
925
926 let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
927 default_builder_opt_for_test(),
928 1,
929 (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
930 sstable_store.clone(),
931 CachePolicy::NotFill,
932 table_id_to_vnode.clone(),
933 table_id_to_watermark_serde.clone(),
934 )
935 .await;
936 assert_eq!(dropped_only_sst.bloom_filter_kind, BloomFilterType::Sstable);
937 let mut inner = dropped_only_sst.get_inner();
938 inner.table_ids.clear();
939 dropped_only_sst.set_inner(inner);
940
941 let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
942 default_builder_opt_for_test(),
943 2,
944 (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
945 sstable_store.clone(),
946 CachePolicy::NotFill,
947 table_id_to_vnode.clone(),
948 table_id_to_watermark_serde.clone(),
949 )
950 .await;
951 let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
952 default_builder_opt_for_test(),
953 3,
954 (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
955 sstable_store.clone(),
956 CachePolicy::NotFill,
957 table_id_to_vnode,
958 table_id_to_watermark_serde,
959 )
960 .await;
961
962 let mut storage_opts = default_opts_for_test();
963 storage_opts.enable_fast_compaction = true;
964 storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
965 storage_opts.compactor_fast_max_compact_delete_ratio = 100;
966 let context = CompactorContext::new_local_compact_context(
967 Arc::new(storage_opts),
968 sstable_store,
969 Arc::new(CompactorMetrics::unused()),
970 None,
971 );
972
973 let task = CompactTask {
974 input_ssts: vec![
975 InputLevel {
976 level_idx: 1,
977 level_type: LevelType::Nonoverlapping,
978 table_infos: vec![dropped_only_sst, live_left_sst],
979 },
980 InputLevel {
981 level_idx: 2,
982 level_type: LevelType::Nonoverlapping,
983 table_infos: vec![live_right_sst],
984 },
985 ],
986 task_id: 42,
987 target_level: 2,
988 existing_table_ids: vec![TableId::new(2)],
989 target_file_size: 1 << 20,
990 task_type: TaskType::Dynamic,
991 ..Default::default()
992 };
993
994 assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
995 assert!(optimize_by_copy_block(&task, &context));
996
997 let runner = CompactorRunner::new(
998 context,
999 task,
1000 CompactionCatalogAgent::for_test(vec![1, 2]),
1001 SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1002 Arc::new(TaskProgress::default()),
1003 MultiCompactionFilter::default(),
1004 );
1005 runner.run().await.unwrap();
1006 }
1007}