1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::task_progress::TaskProgress;
38use crate::hummock::compactor::{
39 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
40 RemoteBuilderFactory, TaskConfig,
41};
42use crate::hummock::iterator::{
43 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
44 ValueSkipWatermarkState,
45};
46use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
47use crate::hummock::sstable_store::SstableStoreRef;
48use crate::hummock::value::HummockValue;
49use crate::hummock::{
50 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
51 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
52 TableHolder, UnifiedSstableWriterFactory,
53};
54use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
55
56pub struct BlockStreamIterator {
58 block_stream: Option<BlockDataStream>,
60
61 next_block_index: usize,
62
63 sstable: TableHolder,
65 iter: Option<BlockIterator>,
66 task_progress: Arc<TaskProgress>,
67
68 sstable_store: SstableStoreRef,
70 sstable_info: SstableInfo,
71 io_retry_times: usize,
72 max_io_retry_times: usize,
73 stats_ptr: Arc<AtomicU64>,
74}
75
76impl BlockStreamIterator {
77 pub fn new(
92 sstable: TableHolder,
93 task_progress: Arc<TaskProgress>,
94 sstable_store: SstableStoreRef,
95 sstable_info: SstableInfo,
96 max_io_retry_times: usize,
97 stats_ptr: Arc<AtomicU64>,
98 ) -> Self {
99 Self {
100 block_stream: None,
101 next_block_index: 0,
102 sstable,
103 iter: None,
104 task_progress,
105 sstable_store,
106 sstable_info,
107 io_retry_times: 0,
108 max_io_retry_times,
109 stats_ptr,
110 }
111 }
112
113 async fn create_stream(&mut self) -> HummockResult<()> {
114 let block_stream = self
117 .sstable_store
118 .get_stream_for_blocks(
119 self.sstable_info.object_id,
120 &self.sstable.meta.block_metas[self.next_block_index..],
121 )
122 .instrument_await("stream_iter_get_stream".verbose())
123 .await?;
124 self.block_stream = Some(block_stream);
125 Ok(())
126 }
127
128 pub(crate) async fn download_next_block(
130 &mut self,
131 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
132 let now = Instant::now();
133 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
134 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
135 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
136 });
137 loop {
138 let ret = match &mut self.block_stream {
139 Some(block_stream) => block_stream.next_block_impl().await,
140 None => {
141 self.create_stream().await?;
142 continue;
143 }
144 };
145 match ret {
146 Ok(Some((data, _))) => {
147 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
148 let filter_block = self
149 .sstable
150 .filter_reader
151 .get_block_raw_filter(self.next_block_index);
152 self.next_block_index += 1;
153 return Ok(Some((data, filter_block, meta)));
154 }
155
156 Ok(None) => break,
157
158 Err(e) => {
159 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
160 return Err(e);
161 }
162
163 self.block_stream.take();
164 self.io_retry_times += 1;
165 fail_point!("create_stream_err");
166
167 tracing::warn!(
168 "fast compact retry create stream for sstable {} times, sstinfo={}",
169 self.io_retry_times,
170 format!(
171 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
172 self.sstable_info.object_id,
173 self.sstable_info.sst_id,
174 self.sstable_info.meta_offset,
175 self.sstable_info.table_ids
176 )
177 );
178 }
179 }
180 }
181
182 self.next_block_index = self.sstable.meta.block_metas.len();
183 self.iter.take();
184 Ok(None)
185 }
186
187 pub(crate) fn init_block_iter(
188 &mut self,
189 buf: Bytes,
190 uncompressed_capacity: usize,
191 ) -> HummockResult<()> {
192 let block = Block::decode(buf, uncompressed_capacity)?;
193 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
194 iter.seek_to_first();
195 self.iter = Some(iter);
196 Ok(())
197 }
198
199 fn next_block_smallest(&self) -> &[u8] {
200 self.sstable.meta.block_metas[self.next_block_index]
201 .smallest_key
202 .as_ref()
203 }
204
205 fn next_block_largest(&self) -> &[u8] {
206 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
207 self.sstable.meta.block_metas[self.next_block_index + 1]
208 .smallest_key
209 .as_ref()
210 } else {
211 self.sstable.meta.largest_key.as_ref()
212 }
213 }
214
215 fn current_block_largest(&self) -> Vec<u8> {
216 if self.next_block_index < self.sstable.meta.block_metas.len() {
217 let mut largest_key = FullKey::decode(
218 self.sstable.meta.block_metas[self.next_block_index]
219 .smallest_key
220 .as_ref(),
221 );
222 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
224 largest_key.encode()
225 } else {
226 self.sstable.meta.largest_key.clone()
227 }
228 }
229
230 fn key(&self) -> FullKey<&[u8]> {
231 match self.iter.as_ref() {
232 Some(iter) => iter.key(),
233 None => FullKey::decode(
234 self.sstable.meta.block_metas[self.next_block_index]
235 .smallest_key
236 .as_ref(),
237 ),
238 }
239 }
240
241 pub(crate) fn is_valid(&self) -> bool {
242 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
243 }
244
245 #[cfg(test)]
246 #[cfg(feature = "failpoints")]
247 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
248 self.iter.as_mut().unwrap()
249 }
250}
251
252impl Drop for BlockStreamIterator {
253 fn drop(&mut self) {
254 self.task_progress.dec_num_pending_read_io();
255 }
256}
257
258pub struct ConcatSstableIterator {
261 sstable_iter: Option<BlockStreamIterator>,
263
264 cur_idx: usize,
266
267 sstables: Vec<SstableInfo>,
269
270 sstable_store: SstableStoreRef,
271
272 stats: StoreLocalStatistic,
273 task_progress: Arc<TaskProgress>,
274
275 max_io_retry_times: usize,
276}
277
278impl ConcatSstableIterator {
279 pub fn new(
283 sst_infos: Vec<SstableInfo>,
284 sstable_store: SstableStoreRef,
285 task_progress: Arc<TaskProgress>,
286 max_io_retry_times: usize,
287 ) -> Self {
288 Self {
289 sstable_iter: None,
290 cur_idx: 0,
291 sstables: sst_infos,
292 sstable_store,
293 task_progress,
294 stats: StoreLocalStatistic::default(),
295 max_io_retry_times,
296 }
297 }
298
299 pub async fn rewind(&mut self) -> HummockResult<()> {
300 self.seek_idx(0).await
301 }
302
303 pub async fn next_sstable(&mut self) -> HummockResult<()> {
304 self.seek_idx(self.cur_idx + 1).await
305 }
306
307 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
308 self.sstable_iter.as_mut().unwrap()
309 }
310
311 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
312 if let Some(sstable) = self.sstable_iter.as_mut() {
313 if sstable.iter.is_some() {
314 return Ok(());
315 }
316 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
317 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
318 }
319 Ok(())
320 }
321
322 pub fn is_valid(&self) -> bool {
323 self.cur_idx < self.sstables.len()
324 }
325
326 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
328 self.sstable_iter.take();
329 self.cur_idx = idx;
330 if self.cur_idx < self.sstables.len() {
331 let sstable_info = &self.sstables[self.cur_idx];
332 let sstable = self
333 .sstable_store
334 .sstable(sstable_info, &mut self.stats)
335 .instrument_await("stream_iter_sstable".verbose())
336 .await?;
337 self.task_progress.inc_num_pending_read_io();
338
339 let sstable_iter = BlockStreamIterator::new(
340 sstable,
341 self.task_progress.clone(),
342 self.sstable_store.clone(),
343 sstable_info.clone(),
344 self.max_io_retry_times,
345 self.stats.remote_io_time.clone(),
346 );
347 self.sstable_iter = Some(sstable_iter);
348 }
349 Ok(())
350 }
351}
352
353pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
354 left: Box<ConcatSstableIterator>,
355 right: Box<ConcatSstableIterator>,
356 task_id: u64,
357 executor: CompactTaskExecutor<
358 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
359 C,
360 >,
361 compression_algorithm: CompressionAlgorithm,
362 metrics: Arc<CompactorMetrics>,
363}
364
365impl<C: CompactionFilter> CompactorRunner<C> {
366 pub fn new(
367 context: CompactorContext,
368 task: CompactTask,
369 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
370 object_id_getter: Arc<dyn GetObjectId>,
371 task_progress: Arc<TaskProgress>,
372 compaction_filter: C,
373 ) -> Self {
374 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
375 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
376 options.compression_algorithm = compression_algorithm;
377 options.capacity = task.target_file_size as usize;
378 options.max_vnode_key_range_bytes = None;
380 let get_id_time = Arc::new(AtomicU64::new(0));
381
382 debug_assert_eq!(
383 task.sstable_filter_kind,
384 PbSstableFilterType::SstableFilterXor16,
385 "fast compaction only supports blocked xor16 filter today"
386 );
387 debug_assert!(
388 task.should_use_block_based_filter(),
389 "fast compaction can only preserve blocked filters; expected blocked output"
390 );
391
392 let key_range = KeyRange::inf();
393 let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
394
395 let task_config = TaskConfig {
396 key_range,
397 cache_policy: CachePolicy::NotFill,
398 gc_delete_keys: task.gc_delete_keys,
399 retain_multiple_version: false,
400 table_vnode_partition: task.table_vnode_partition.clone(),
401 use_block_based_filter: true,
402 sstable_filter_kind: task.sstable_filter_kind,
403 table_schemas: Default::default(),
404 disable_drop_column_optimization: false,
405 };
406 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
407
408 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
409 object_id_getter,
410 limiter: context.memory_limiter.clone(),
411 options,
412 policy: task_config.cache_policy,
413 remote_rpc_cost: get_id_time,
414 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
415 sstable_writer_factory: factory,
416 _phantom: PhantomData,
417 };
418 let sst_builder = CapacitySplitTableBuilder::new(
419 builder_factory,
420 context.compactor_metrics.clone(),
421 Some(task_progress.clone()),
422 task_config.table_vnode_partition.clone(),
423 context
424 .storage_opts
425 .compactor_concurrent_uploading_sst_count,
426 compaction_catalog_agent_ref.clone(),
427 );
428 assert_eq!(
429 task.input_ssts.len(),
430 2,
431 "TaskId {} target_level {:?} task {:?}",
432 task.task_id,
433 task.target_level,
434 compact_task_to_string(&task)
435 );
436 let left_ssts = task.input_ssts[0]
437 .read_sstable_infos()
438 .cloned()
439 .collect_vec();
440 let right_ssts = task.input_ssts[1]
441 .read_sstable_infos()
442 .cloned()
443 .collect_vec();
444 assert!(
445 left_ssts
446 .iter()
447 .chain(right_ssts.iter())
448 .all(|sst| sst.bloom_filter_kind == BloomFilterType::Blocked),
449 "fast compaction requires blocked-filter SSTs: {}",
450 compact_task_to_string(&task)
451 );
452 let left = Box::new(ConcatSstableIterator::new(
453 left_ssts,
454 context.sstable_store.clone(),
455 task_progress.clone(),
456 context.storage_opts.compactor_iter_max_io_retry_times,
457 ));
458 let right = Box::new(ConcatSstableIterator::new(
459 right_ssts,
460 context.sstable_store,
461 task_progress.clone(),
462 context.storage_opts.compactor_iter_max_io_retry_times,
463 ));
464
465 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
467 task.pk_prefix_table_watermarks.clone(),
468 );
469 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
470 task.non_pk_prefix_table_watermarks.clone(),
471 compaction_catalog_agent_ref.clone(),
472 );
473 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
474 task.value_table_watermarks.clone(),
475 compaction_catalog_agent_ref,
476 );
477
478 Self {
479 executor: CompactTaskExecutor::new(
480 sst_builder,
481 task_config,
482 task_progress,
483 pk_prefix_state,
484 non_pk_prefix_state,
485 value_skip_watermark_state,
486 compaction_filter,
487 read_table_ids,
488 ),
489 left,
490 right,
491 task_id: task.task_id,
492 metrics: context.compactor_metrics,
493 compression_algorithm,
494 }
495 }
496
497 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
498 self.left.rewind().await?;
499 self.right.rewind().await?;
500 let mut skip_raw_block_count = 0;
501 let mut skip_raw_block_size = 0;
502 while self.left.is_valid() && self.right.is_valid() {
503 let ret = self
504 .left
505 .current_sstable()
506 .key()
507 .cmp(&self.right.current_sstable().key());
508 let (first, second) = if ret == Ordering::Less {
509 (&mut self.left, &mut self.right)
510 } else {
511 (&mut self.right, &mut self.left)
512 };
513 assert!(
514 ret != Ordering::Equal,
515 "sst range overlap equal_key {:?}",
516 self.left.current_sstable().key()
517 );
518 if first.current_sstable().iter.is_none() {
519 let right_key = second.current_sstable().key();
520 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
521 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
522 if full_key.user_key.ge(&right_key.user_key) {
525 break;
526 }
527 let smallest_key =
528 FullKey::decode(first.current_sstable().next_block_smallest());
529 if !self.executor.shall_copy_raw_block(&smallest_key) {
530 break;
531 }
532 let smallest_key = smallest_key.to_vec();
533
534 let (mut block, filter_data, mut meta) = first
535 .current_sstable()
536 .download_next_block()
537 .await?
538 .unwrap();
539 let algorithm = Block::get_algorithm(&block)?;
540 if algorithm == CompressionAlgorithm::None
541 && algorithm != self.compression_algorithm
542 {
543 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
544 meta.len = block.len() as u32;
545 }
546
547 let largest_key = first.current_sstable().current_block_largest();
548 let block_len = block.len() as u64;
549 let block_key_count = meta.total_key_count;
550
551 if self
552 .executor
553 .builder
554 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
555 .await?
556 {
557 skip_raw_block_size += block_len;
558 skip_raw_block_count += 1;
559 }
560 self.executor.may_report_process_key(block_key_count);
561 self.executor.clear();
562 }
563 if !first.current_sstable().is_valid() {
564 first.next_sstable().await?;
565 continue;
566 }
567 first.init_block_iter().await?;
568 }
569
570 let target_key = second.current_sstable().key();
571 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
572 self.executor.reset_watermark();
573 self.executor.run(iter, target_key).await?;
574 if !iter.is_valid() {
575 first.sstable_iter.as_mut().unwrap().iter.take();
576 if !first.current_sstable().is_valid() {
577 first.next_sstable().await?;
578 }
579 }
580 }
581 let rest_data = if !self.left.is_valid() {
582 &mut self.right
583 } else {
584 &mut self.left
585 };
586 if rest_data.is_valid() {
587 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
589 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
590 if let Some(iter) = sstable_iter.iter.as_mut() {
591 self.executor.reset_watermark();
592 self.executor.run(iter, target_key).await?;
593 assert!(
594 !iter.is_valid(),
595 "iter should not be valid key {:?}",
596 iter.key()
597 );
598 }
599 sstable_iter.iter.take();
600 }
601
602 while rest_data.is_valid() {
603 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
604 while sstable_iter.is_valid() {
605 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
606 let (block, filter_data, block_meta) =
607 sstable_iter.download_next_block().await?.unwrap();
608 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
610 && self.executor.last_key_is_delete;
611 if self.executor.builder.need_flush()
612 || need_deleted
613 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
614 {
615 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
616 let target_key = FullKey::decode(&largest_key);
617 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
618 let mut iter = sstable_iter.iter.take().unwrap();
619 self.executor.reset_watermark();
620 self.executor.run(&mut iter, target_key).await?;
621 } else {
622 let largest_key = sstable_iter.current_block_largest();
623 let block_len = block.len() as u64;
624 let block_key_count = block_meta.total_key_count;
625 if self
626 .executor
627 .builder
628 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
629 .await?
630 {
631 skip_raw_block_count += 1;
632 skip_raw_block_size += block_len;
633 }
634 self.executor.may_report_process_key(block_key_count);
635 self.executor.clear();
636 }
637 }
638 rest_data.next_sstable().await?;
639 }
640 let mut total_read_bytes = 0;
641 for sst in &self.left.sstables {
642 total_read_bytes += sst.sst_size;
643 }
644 for sst in &self.right.sstables {
645 total_read_bytes += sst.sst_size;
646 }
647 self.metrics
648 .compact_fast_runner_bytes
649 .inc_by(skip_raw_block_size);
650 tracing::info!(
651 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
652 skip_raw_block_count,
653 self.task_id,
654 skip_raw_block_size * 100 / total_read_bytes,
655 );
656
657 let statistic = self.executor.take_statistics();
658 let output_ssts = self.executor.builder.finish().await?;
659 Compactor::report_progress(
660 self.metrics.clone(),
661 Some(self.executor.task_progress.clone()),
662 &output_ssts,
663 false,
664 );
665 let sst_infos = output_ssts
666 .iter()
667 .map(|sst| sst.sst_info.clone())
668 .collect_vec();
669 assert!(can_concat(&sst_infos));
670 Ok((output_ssts, statistic))
671 }
672}
673
674pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
675 last_key: FullKey<Vec<u8>>,
676 compaction_statistics: CompactionStatistics,
677 last_table_id: Option<TableId>,
678 last_table_stats: TableStats,
679 builder: CapacitySplitTableBuilder<F>,
680 task_config: TaskConfig,
681 task_progress: Arc<TaskProgress>,
682 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
683 last_key_is_delete: bool,
684 progress_key_num: u32,
685 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
686 value_skip_watermark_state: ValueSkipWatermarkState,
687 compaction_filter: C,
688 read_table_ids: HashSet<TableId>,
689}
690
691impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
692 pub fn new(
693 builder: CapacitySplitTableBuilder<F>,
694 task_config: TaskConfig,
695 task_progress: Arc<TaskProgress>,
696 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
697 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
698 value_skip_watermark_state: ValueSkipWatermarkState,
699 compaction_filter: C,
700 read_table_ids: HashSet<TableId>,
701 ) -> Self {
702 Self {
703 builder,
704 task_config,
705 last_key: FullKey::default(),
706 last_key_is_delete: false,
707 compaction_statistics: CompactionStatistics::default(),
708 last_table_id: None,
709 last_table_stats: TableStats::default(),
710 task_progress,
711 pk_prefix_skip_watermark_state,
712 progress_key_num: 0,
713 non_pk_prefix_skip_watermark_state,
714 value_skip_watermark_state,
715 compaction_filter,
716 read_table_ids,
717 }
718 }
719
720 fn take_statistics(&mut self) -> CompactionStatistics {
721 if let Some(last_table_id) = self.last_table_id.take() {
722 self.compaction_statistics
723 .delta_drop_stat
724 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
725 }
726 std::mem::take(&mut self.compaction_statistics)
727 }
728
729 fn clear(&mut self) {
730 if !self.last_key.is_empty() {
731 self.last_key = FullKey::default();
732 }
733 self.last_key_is_delete = false;
734 }
735
736 fn reset_watermark(&mut self) {
737 self.pk_prefix_skip_watermark_state.reset_watermark();
738 self.non_pk_prefix_skip_watermark_state.reset_watermark();
739 self.value_skip_watermark_state.reset_watermark();
740 }
741
742 #[inline(always)]
743 fn should_skip_block(&self, table_id: TableId) -> bool {
744 !self.read_table_ids.contains(&table_id)
745 }
746
747 #[inline(always)]
748 fn may_report_process_key(&mut self, key_count: u32) {
749 const PROGRESS_KEY_INTERVAL: u32 = 100;
750 self.progress_key_num += key_count;
751 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
752 self.task_progress
753 .inc_progress_key(self.progress_key_num as u64);
754 self.progress_key_num = 0;
755 }
756 }
757
758 pub async fn run(
759 &mut self,
760 iter: &mut BlockIterator,
761 target_key: FullKey<&[u8]>,
762 ) -> HummockResult<()> {
763 if self.should_skip_block(iter.table_id()) {
764 iter.finish_block();
765 return Ok(());
766 }
767
768 while iter.is_valid() && iter.key().le(&target_key) {
769 let is_new_user_key =
770 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
771 self.compaction_statistics.iter_total_key_counts += 1;
772 self.may_report_process_key(1);
773
774 let mut drop = false;
775 let value = HummockValue::from_slice(iter.value()).unwrap();
776 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
777 if is_first_or_new_user_key {
778 self.last_key.set(iter.key());
779 self.last_key_is_delete = false;
780 }
781
782 if !self.task_config.retain_multiple_version
784 && self.task_config.gc_delete_keys
785 && value.is_delete()
786 {
787 drop = true;
788 self.last_key_is_delete = true;
789 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
790 drop = true;
791 }
792
793 if !drop && self.compaction_filter.should_delete(iter.key()) {
794 drop = true;
795 }
796
797 if !drop && self.watermark_should_delete(&iter.key(), value) {
798 drop = true;
799 self.last_key_is_delete = true;
800 }
801
802 if self.last_table_id != Some(self.last_key.user_key.table_id) {
803 if let Some(last_table_id) = self.last_table_id.take() {
804 self.compaction_statistics
805 .delta_drop_stat
806 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
807 }
808 self.last_table_id = Some(self.last_key.user_key.table_id);
809 }
810
811 if drop {
812 self.compaction_statistics.iter_drop_key_counts += 1;
813
814 self.last_table_stats.total_key_count -= 1;
815 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
816 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
817 iter.next();
818 continue;
819 }
820 self.builder
821 .add_full_key(iter.key(), value, is_new_user_key)
822 .await?;
823 iter.next();
824 }
825 Ok(())
826 }
827
828 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
829 if self.should_skip_block(smallest_key.user_key.table_id) {
830 return false;
832 }
833
834 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
835 return false;
838 }
839
840 if self.watermark_may_delete(smallest_key) {
841 return false;
842 }
843
844 if self.compaction_filter.should_delete(*smallest_key) {
846 return false;
847 }
848
849 true
850 }
851
852 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
853 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
855 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
856 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
857 let unused = vec![];
858 let unused_put = HummockValue::Put(unused.as_slice());
859 if (pk_prefix_has_watermark
860 && self
861 .pk_prefix_skip_watermark_state
862 .should_delete(key, unused_put))
863 || (non_pk_prefix_has_watermark
864 && self
865 .non_pk_prefix_skip_watermark_state
866 .should_delete(key, unused_put))
867 {
868 return true;
869 }
870 }
871 self.value_skip_watermark_state.has_watermark()
872 && self.value_skip_watermark_state.may_delete(key)
873 }
874
875 fn watermark_should_delete(
876 &mut self,
877 key: &FullKey<&[u8]>,
878 value: HummockValue<&[u8]>,
879 ) -> bool {
880 (self.pk_prefix_skip_watermark_state.has_watermark()
881 && self
882 .pk_prefix_skip_watermark_state
883 .should_delete(key, value))
884 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
885 && self
886 .non_pk_prefix_skip_watermark_state
887 .should_delete(key, value))
888 || (self.value_skip_watermark_state.has_watermark()
889 && self.value_skip_watermark_state.should_delete(key, value))
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use std::collections::{HashMap, VecDeque};
896 use std::sync::Arc;
897
898 use risingwave_common::catalog::TableId;
899 use risingwave_common::hash::VirtualNode;
900 use risingwave_common::util::epoch::test_epoch;
901 use risingwave_hummock_sdk::compact_task::CompactTask;
902 use risingwave_hummock_sdk::key::FullKey;
903 use risingwave_hummock_sdk::level::InputLevel;
904 use risingwave_pb::hummock::compact_task::TaskType;
905 use risingwave_pb::hummock::{BloomFilterType, LevelType, PbSstableFilterType};
906
907 use super::CompactorRunner;
908 use crate::compaction_catalog_manager::CompactionCatalogAgent;
909 use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
910 use crate::hummock::compactor::task_progress::TaskProgress;
911 use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
912 use crate::hummock::iterator::test_utils::mock_sstable_store;
913 use crate::hummock::test_utils::{
914 default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
915 };
916 use crate::hummock::value::HummockValue;
917 use crate::hummock::{
918 BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
919 };
920 use crate::monitor::CompactorMetrics;
921
922 fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
923 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
924 table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
925 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
926 }
927
928 #[tokio::test]
929 async fn test_fast_compact_skips_empty_table_id_sst() {
930 let sstable_store = mock_sstable_store().await;
931 let table_id_to_vnode = HashMap::from([
932 (1, VirtualNode::COUNT_FOR_TEST),
933 (2, VirtualNode::COUNT_FOR_TEST),
934 ]);
935 let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
936
937 let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
938 default_builder_opt_for_test(),
939 1,
940 (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
941 sstable_store.clone(),
942 CachePolicy::NotFill,
943 table_id_to_vnode.clone(),
944 table_id_to_watermark_serde.clone(),
945 )
946 .await;
947 assert_eq!(dropped_only_sst.bloom_filter_kind, BloomFilterType::Sstable);
948 let mut inner = dropped_only_sst.get_inner();
949 inner.table_ids.clear();
950 dropped_only_sst.set_inner(inner);
951
952 let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
953 default_builder_opt_for_test(),
954 2,
955 (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
956 sstable_store.clone(),
957 CachePolicy::NotFill,
958 table_id_to_vnode.clone(),
959 table_id_to_watermark_serde.clone(),
960 )
961 .await;
962 let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
963 default_builder_opt_for_test(),
964 3,
965 (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
966 sstable_store.clone(),
967 CachePolicy::NotFill,
968 table_id_to_vnode,
969 table_id_to_watermark_serde,
970 )
971 .await;
972
973 let mut storage_opts = default_opts_for_test();
974 storage_opts.enable_fast_compaction = true;
975 storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
976 storage_opts.compactor_fast_max_compact_delete_ratio = 100;
977 let context = CompactorContext::new_local_compact_context(
978 Arc::new(storage_opts),
979 sstable_store,
980 Arc::new(CompactorMetrics::unused()),
981 None,
982 );
983
984 let task = CompactTask {
985 input_ssts: vec![
986 InputLevel {
987 level_idx: 1,
988 level_type: LevelType::Nonoverlapping,
989 table_infos: vec![dropped_only_sst, live_left_sst],
990 },
991 InputLevel {
992 level_idx: 2,
993 level_type: LevelType::Nonoverlapping,
994 table_infos: vec![live_right_sst],
995 },
996 ],
997 task_id: 42,
998 target_level: 2,
999 existing_table_ids: vec![TableId::new(2)],
1000 target_file_size: 1 << 20,
1001 task_type: TaskType::Dynamic,
1002 blocked_xor_filter_kv_count_threshold: Some(0),
1003 sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
1004 ..Default::default()
1005 };
1006
1007 assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
1008 assert!(optimize_by_copy_block(&task, &context));
1009
1010 let runner = CompactorRunner::new(
1011 context,
1012 task,
1013 CompactionCatalogAgent::for_test(vec![1, 2]),
1014 SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1015 Arc::new(TaskProgress::default()),
1016 MultiCompactionFilter::default(),
1017 );
1018 runner.run().await.unwrap();
1019 }
1020}