1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::compaction_utils::{
38 blocked_xor_filter_key_count_threshold, estimate_output_key_count_for_task,
39};
40use crate::hummock::compactor::task_progress::TaskProgress;
41use crate::hummock::compactor::{
42 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
43 RemoteBuilderFactory, TaskConfig,
44};
45use crate::hummock::iterator::{
46 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
47 ValueSkipWatermarkState,
48};
49use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
50use crate::hummock::sstable_store::SstableStoreRef;
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
54 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
55 TableHolder, UnifiedSstableWriterFactory,
56};
57use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
58
59pub struct BlockStreamIterator {
61 block_stream: Option<BlockDataStream>,
63
64 next_block_index: usize,
65
66 sstable: TableHolder,
68 iter: Option<BlockIterator>,
69 task_progress: Arc<TaskProgress>,
70
71 sstable_store: SstableStoreRef,
73 sstable_info: SstableInfo,
74 io_retry_times: usize,
75 max_io_retry_times: usize,
76 stats_ptr: Arc<AtomicU64>,
77}
78
79impl BlockStreamIterator {
80 pub fn new(
95 sstable: TableHolder,
96 task_progress: Arc<TaskProgress>,
97 sstable_store: SstableStoreRef,
98 sstable_info: SstableInfo,
99 max_io_retry_times: usize,
100 stats_ptr: Arc<AtomicU64>,
101 ) -> Self {
102 Self {
103 block_stream: None,
104 next_block_index: 0,
105 sstable,
106 iter: None,
107 task_progress,
108 sstable_store,
109 sstable_info,
110 io_retry_times: 0,
111 max_io_retry_times,
112 stats_ptr,
113 }
114 }
115
116 async fn create_stream(&mut self) -> HummockResult<()> {
117 let block_stream = self
120 .sstable_store
121 .get_stream_for_blocks(
122 self.sstable_info.object_id,
123 &self.sstable.meta.block_metas[self.next_block_index..],
124 )
125 .instrument_await("stream_iter_get_stream".verbose())
126 .await?;
127 self.block_stream = Some(block_stream);
128 Ok(())
129 }
130
131 pub(crate) async fn download_next_block(
133 &mut self,
134 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
135 let now = Instant::now();
136 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
137 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
138 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
139 });
140 loop {
141 let ret = match &mut self.block_stream {
142 Some(block_stream) => block_stream.next_block_impl().await,
143 None => {
144 self.create_stream().await?;
145 continue;
146 }
147 };
148 match ret {
149 Ok(Some((data, _))) => {
150 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
151 let filter_block = self
152 .sstable
153 .filter_reader
154 .get_block_raw_filter(self.next_block_index);
155 self.next_block_index += 1;
156 return Ok(Some((data, filter_block, meta)));
157 }
158
159 Ok(None) => break,
160
161 Err(e) => {
162 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
163 return Err(e);
164 }
165
166 self.block_stream.take();
167 self.io_retry_times += 1;
168 fail_point!("create_stream_err");
169
170 tracing::warn!(
171 "fast compact retry create stream for sstable {} times, sstinfo={}",
172 self.io_retry_times,
173 format!(
174 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
175 self.sstable_info.object_id,
176 self.sstable_info.sst_id,
177 self.sstable_info.meta_offset,
178 self.sstable_info.table_ids
179 )
180 );
181 }
182 }
183 }
184
185 self.next_block_index = self.sstable.meta.block_metas.len();
186 self.iter.take();
187 Ok(None)
188 }
189
190 pub(crate) fn init_block_iter(
191 &mut self,
192 buf: Bytes,
193 uncompressed_capacity: usize,
194 ) -> HummockResult<()> {
195 let block = Block::decode(buf, uncompressed_capacity)?;
196 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
197 iter.seek_to_first();
198 self.iter = Some(iter);
199 Ok(())
200 }
201
202 fn next_block_smallest(&self) -> &[u8] {
203 self.sstable.meta.block_metas[self.next_block_index]
204 .smallest_key
205 .as_ref()
206 }
207
208 fn next_block_largest(&self) -> &[u8] {
209 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
210 self.sstable.meta.block_metas[self.next_block_index + 1]
211 .smallest_key
212 .as_ref()
213 } else {
214 self.sstable.meta.largest_key.as_ref()
215 }
216 }
217
218 fn current_block_largest(&self) -> Vec<u8> {
219 if self.next_block_index < self.sstable.meta.block_metas.len() {
220 let mut largest_key = FullKey::decode(
221 self.sstable.meta.block_metas[self.next_block_index]
222 .smallest_key
223 .as_ref(),
224 );
225 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
227 largest_key.encode()
228 } else {
229 self.sstable.meta.largest_key.clone()
230 }
231 }
232
233 fn key(&self) -> FullKey<&[u8]> {
234 match self.iter.as_ref() {
235 Some(iter) => iter.key(),
236 None => FullKey::decode(
237 self.sstable.meta.block_metas[self.next_block_index]
238 .smallest_key
239 .as_ref(),
240 ),
241 }
242 }
243
244 pub(crate) fn is_valid(&self) -> bool {
245 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
246 }
247
248 #[cfg(test)]
249 #[cfg(feature = "failpoints")]
250 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
251 self.iter.as_mut().unwrap()
252 }
253}
254
255impl Drop for BlockStreamIterator {
256 fn drop(&mut self) {
257 self.task_progress.dec_num_pending_read_io();
258 }
259}
260
261pub struct ConcatSstableIterator {
264 sstable_iter: Option<BlockStreamIterator>,
266
267 cur_idx: usize,
269
270 sstables: Vec<SstableInfo>,
272
273 sstable_store: SstableStoreRef,
274
275 stats: StoreLocalStatistic,
276 task_progress: Arc<TaskProgress>,
277
278 max_io_retry_times: usize,
279}
280
281impl ConcatSstableIterator {
282 pub fn new(
286 sst_infos: Vec<SstableInfo>,
287 sstable_store: SstableStoreRef,
288 task_progress: Arc<TaskProgress>,
289 max_io_retry_times: usize,
290 ) -> Self {
291 Self {
292 sstable_iter: None,
293 cur_idx: 0,
294 sstables: sst_infos,
295 sstable_store,
296 task_progress,
297 stats: StoreLocalStatistic::default(),
298 max_io_retry_times,
299 }
300 }
301
302 pub async fn rewind(&mut self) -> HummockResult<()> {
303 self.seek_idx(0).await
304 }
305
306 pub async fn next_sstable(&mut self) -> HummockResult<()> {
307 self.seek_idx(self.cur_idx + 1).await
308 }
309
310 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
311 self.sstable_iter.as_mut().unwrap()
312 }
313
314 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
315 if let Some(sstable) = self.sstable_iter.as_mut() {
316 if sstable.iter.is_some() {
317 return Ok(());
318 }
319 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
320 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
321 }
322 Ok(())
323 }
324
325 pub fn is_valid(&self) -> bool {
326 self.cur_idx < self.sstables.len()
327 }
328
329 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
331 self.sstable_iter.take();
332 self.cur_idx = idx;
333 if self.cur_idx < self.sstables.len() {
334 let sstable_info = &self.sstables[self.cur_idx];
335 let sstable = self
336 .sstable_store
337 .sstable(sstable_info, &mut self.stats)
338 .instrument_await("stream_iter_sstable".verbose())
339 .await?;
340 self.task_progress.inc_num_pending_read_io();
341
342 let sstable_iter = BlockStreamIterator::new(
343 sstable,
344 self.task_progress.clone(),
345 self.sstable_store.clone(),
346 sstable_info.clone(),
347 self.max_io_retry_times,
348 self.stats.remote_io_time.clone(),
349 );
350 self.sstable_iter = Some(sstable_iter);
351 }
352 Ok(())
353 }
354}
355
356pub struct CompactorRunner<C: CompactionFilter = MultiCompactionFilter> {
357 left: Box<ConcatSstableIterator>,
358 right: Box<ConcatSstableIterator>,
359 task_id: u64,
360 executor: CompactTaskExecutor<
361 RemoteBuilderFactory<UnifiedSstableWriterFactory, BlockedXor16FilterBuilder>,
362 C,
363 >,
364 compression_algorithm: CompressionAlgorithm,
365 metrics: Arc<CompactorMetrics>,
366}
367
368impl<C: CompactionFilter> CompactorRunner<C> {
369 pub fn new(
370 context: CompactorContext,
371 task: CompactTask,
372 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
373 object_id_getter: Arc<dyn GetObjectId>,
374 task_progress: Arc<TaskProgress>,
375 compaction_filter: C,
376 ) -> Self {
377 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
378 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
379 options.compression_algorithm = compression_algorithm;
380 options.capacity = task.target_file_size as usize;
381 let estimated_output_key_count =
382 estimate_output_key_count_for_task(&task, options.capacity);
383 options.estimated_output_key_count = Some(estimated_output_key_count);
384 options.filter_hash_prealloc_key_count_cap =
385 blocked_xor_filter_key_count_threshold(task.blocked_xor_filter_kv_count_threshold);
386 options.max_vnode_key_range_bytes = None;
388 let get_id_time = Arc::new(AtomicU64::new(0));
389
390 debug_assert_eq!(
391 task.sstable_filter_kind,
392 PbSstableFilterType::SstableFilterXor16,
393 "fast compaction only supports blocked xor16 filter today"
394 );
395 debug_assert!(
396 task.should_use_block_based_filter_for_output(estimated_output_key_count as u64),
397 "fast compaction can only preserve blocked filters; expected blocked output"
398 );
399
400 let key_range = KeyRange::inf();
401 let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
402
403 let task_config = TaskConfig {
404 key_range,
405 cache_policy: CachePolicy::NotFill,
406 gc_delete_keys: task.gc_delete_keys,
407 retain_multiple_version: false,
408 table_vnode_partition: task.table_vnode_partition.clone(),
409 use_block_based_filter: true,
410 sstable_filter_kind: task.sstable_filter_kind,
411 table_schemas: Default::default(),
412 disable_drop_column_optimization: false,
413 };
414 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
415
416 let builder_factory = RemoteBuilderFactory::<_, BlockedXor16FilterBuilder> {
417 object_id_getter,
418 limiter: context.memory_limiter.clone(),
419 options,
420 policy: task_config.cache_policy,
421 remote_rpc_cost: get_id_time,
422 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
423 sstable_writer_factory: factory,
424 _phantom: PhantomData,
425 };
426 let sst_builder = CapacitySplitTableBuilder::new(
427 builder_factory,
428 context.compactor_metrics.clone(),
429 Some(task_progress.clone()),
430 task_config.table_vnode_partition.clone(),
431 context
432 .storage_opts
433 .compactor_concurrent_uploading_sst_count,
434 compaction_catalog_agent_ref.clone(),
435 );
436 assert_eq!(
437 task.input_ssts.len(),
438 2,
439 "TaskId {} target_level {:?} task {:?}",
440 task.task_id,
441 task.target_level,
442 compact_task_to_string(&task)
443 );
444 let left_ssts = task.input_ssts[0]
445 .read_sstable_infos()
446 .cloned()
447 .collect_vec();
448 let right_ssts = task.input_ssts[1]
449 .read_sstable_infos()
450 .cloned()
451 .collect_vec();
452 assert!(
453 left_ssts
454 .iter()
455 .chain(right_ssts.iter())
456 .all(|sst| sst.bloom_filter_kind == BloomFilterType::Blocked),
457 "fast compaction requires blocked-filter SSTs: {}",
458 compact_task_to_string(&task)
459 );
460 let left = Box::new(ConcatSstableIterator::new(
461 left_ssts,
462 context.sstable_store.clone(),
463 task_progress.clone(),
464 context.storage_opts.compactor_iter_max_io_retry_times,
465 ));
466 let right = Box::new(ConcatSstableIterator::new(
467 right_ssts,
468 context.sstable_store,
469 task_progress.clone(),
470 context.storage_opts.compactor_iter_max_io_retry_times,
471 ));
472
473 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
475 task.pk_prefix_table_watermarks.clone(),
476 );
477 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
478 task.non_pk_prefix_table_watermarks.clone(),
479 compaction_catalog_agent_ref.clone(),
480 );
481 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
482 task.value_table_watermarks.clone(),
483 compaction_catalog_agent_ref,
484 );
485
486 Self {
487 executor: CompactTaskExecutor::new(
488 sst_builder,
489 task_config,
490 task_progress,
491 pk_prefix_state,
492 non_pk_prefix_state,
493 value_skip_watermark_state,
494 compaction_filter,
495 read_table_ids,
496 ),
497 left,
498 right,
499 task_id: task.task_id,
500 metrics: context.compactor_metrics,
501 compression_algorithm,
502 }
503 }
504
505 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
506 self.left.rewind().await?;
507 self.right.rewind().await?;
508 let mut skip_raw_block_count = 0;
509 let mut skip_raw_block_size = 0;
510 while self.left.is_valid() && self.right.is_valid() {
511 let ret = self
512 .left
513 .current_sstable()
514 .key()
515 .cmp(&self.right.current_sstable().key());
516 let (first, second) = if ret == Ordering::Less {
517 (&mut self.left, &mut self.right)
518 } else {
519 (&mut self.right, &mut self.left)
520 };
521 assert!(
522 ret != Ordering::Equal,
523 "sst range overlap equal_key {:?}",
524 self.left.current_sstable().key()
525 );
526 if first.current_sstable().iter.is_none() {
527 let right_key = second.current_sstable().key();
528 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
529 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
530 if full_key.user_key.ge(&right_key.user_key) {
533 break;
534 }
535 let smallest_key =
536 FullKey::decode(first.current_sstable().next_block_smallest());
537 if !self.executor.shall_copy_raw_block(&smallest_key) {
538 break;
539 }
540 let smallest_key = smallest_key.to_vec();
541
542 let (mut block, filter_data, mut meta) = first
543 .current_sstable()
544 .download_next_block()
545 .await?
546 .unwrap();
547 let algorithm = Block::get_algorithm(&block)?;
548 if algorithm == CompressionAlgorithm::None
549 && algorithm != self.compression_algorithm
550 {
551 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
552 meta.len = block.len() as u32;
553 }
554
555 let largest_key = first.current_sstable().current_block_largest();
556 let block_len = block.len() as u64;
557 let block_key_count = meta.total_key_count;
558
559 if self
560 .executor
561 .builder
562 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
563 .await?
564 {
565 skip_raw_block_size += block_len;
566 skip_raw_block_count += 1;
567 }
568 self.executor.may_report_process_key(block_key_count);
569 self.executor.clear();
570 }
571 if !first.current_sstable().is_valid() {
572 first.next_sstable().await?;
573 continue;
574 }
575 first.init_block_iter().await?;
576 }
577
578 let target_key = second.current_sstable().key();
579 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
580 self.executor.reset_watermark();
581 self.executor.run(iter, target_key).await?;
582 if !iter.is_valid() {
583 first.sstable_iter.as_mut().unwrap().iter.take();
584 if !first.current_sstable().is_valid() {
585 first.next_sstable().await?;
586 }
587 }
588 }
589 let rest_data = if !self.left.is_valid() {
590 &mut self.right
591 } else {
592 &mut self.left
593 };
594 if rest_data.is_valid() {
595 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
597 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
598 if let Some(iter) = sstable_iter.iter.as_mut() {
599 self.executor.reset_watermark();
600 self.executor.run(iter, target_key).await?;
601 assert!(
602 !iter.is_valid(),
603 "iter should not be valid key {:?}",
604 iter.key()
605 );
606 }
607 sstable_iter.iter.take();
608 }
609
610 while rest_data.is_valid() {
611 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
612 while sstable_iter.is_valid() {
613 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
614 let (block, filter_data, block_meta) =
615 sstable_iter.download_next_block().await?.unwrap();
616 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
618 && self.executor.last_key_is_delete;
619 if self.executor.builder.need_flush()
620 || need_deleted
621 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
622 {
623 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
624 let target_key = FullKey::decode(&largest_key);
625 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
626 let mut iter = sstable_iter.iter.take().unwrap();
627 self.executor.reset_watermark();
628 self.executor.run(&mut iter, target_key).await?;
629 } else {
630 let largest_key = sstable_iter.current_block_largest();
631 let block_len = block.len() as u64;
632 let block_key_count = block_meta.total_key_count;
633 if self
634 .executor
635 .builder
636 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
637 .await?
638 {
639 skip_raw_block_count += 1;
640 skip_raw_block_size += block_len;
641 }
642 self.executor.may_report_process_key(block_key_count);
643 self.executor.clear();
644 }
645 }
646 rest_data.next_sstable().await?;
647 }
648 let mut total_read_bytes = 0;
649 for sst in &self.left.sstables {
650 total_read_bytes += sst.sst_size;
651 }
652 for sst in &self.right.sstables {
653 total_read_bytes += sst.sst_size;
654 }
655 self.metrics
656 .compact_fast_runner_bytes
657 .inc_by(skip_raw_block_size);
658 tracing::info!(
659 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
660 skip_raw_block_count,
661 self.task_id,
662 skip_raw_block_size * 100 / total_read_bytes,
663 );
664
665 let statistic = self.executor.take_statistics();
666 let output_ssts = self.executor.builder.finish().await?;
667 Compactor::report_progress(
668 self.metrics.clone(),
669 Some(self.executor.task_progress.clone()),
670 &output_ssts,
671 false,
672 );
673 let sst_infos = output_ssts
674 .iter()
675 .map(|sst| sst.sst_info.clone())
676 .collect_vec();
677 assert!(can_concat(&sst_infos));
678 Ok((output_ssts, statistic))
679 }
680}
681
682pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
683 last_key: FullKey<Vec<u8>>,
684 compaction_statistics: CompactionStatistics,
685 last_table_id: Option<TableId>,
686 last_table_stats: TableStats,
687 builder: CapacitySplitTableBuilder<F>,
688 task_config: TaskConfig,
689 task_progress: Arc<TaskProgress>,
690 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
691 last_key_is_delete: bool,
692 progress_key_num: u32,
693 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
694 value_skip_watermark_state: ValueSkipWatermarkState,
695 compaction_filter: C,
696 read_table_ids: HashSet<TableId>,
697}
698
699impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
700 pub fn new(
701 builder: CapacitySplitTableBuilder<F>,
702 task_config: TaskConfig,
703 task_progress: Arc<TaskProgress>,
704 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
705 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
706 value_skip_watermark_state: ValueSkipWatermarkState,
707 compaction_filter: C,
708 read_table_ids: HashSet<TableId>,
709 ) -> Self {
710 Self {
711 builder,
712 task_config,
713 last_key: FullKey::default(),
714 last_key_is_delete: false,
715 compaction_statistics: CompactionStatistics::default(),
716 last_table_id: None,
717 last_table_stats: TableStats::default(),
718 task_progress,
719 pk_prefix_skip_watermark_state,
720 progress_key_num: 0,
721 non_pk_prefix_skip_watermark_state,
722 value_skip_watermark_state,
723 compaction_filter,
724 read_table_ids,
725 }
726 }
727
728 fn take_statistics(&mut self) -> CompactionStatistics {
729 if let Some(last_table_id) = self.last_table_id.take() {
730 self.compaction_statistics
731 .delta_drop_stat
732 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
733 }
734 std::mem::take(&mut self.compaction_statistics)
735 }
736
737 fn clear(&mut self) {
738 if !self.last_key.is_empty() {
739 self.last_key = FullKey::default();
740 }
741 self.last_key_is_delete = false;
742 }
743
744 fn reset_watermark(&mut self) {
745 self.pk_prefix_skip_watermark_state.reset_watermark();
746 self.non_pk_prefix_skip_watermark_state.reset_watermark();
747 self.value_skip_watermark_state.reset_watermark();
748 }
749
750 #[inline(always)]
751 fn should_skip_block(&self, table_id: TableId) -> bool {
752 !self.read_table_ids.contains(&table_id)
753 }
754
755 #[inline(always)]
756 fn may_report_process_key(&mut self, key_count: u32) {
757 const PROGRESS_KEY_INTERVAL: u32 = 100;
758 self.progress_key_num += key_count;
759 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
760 self.task_progress
761 .inc_progress_key(self.progress_key_num as u64);
762 self.progress_key_num = 0;
763 }
764 }
765
766 pub async fn run(
767 &mut self,
768 iter: &mut BlockIterator,
769 target_key: FullKey<&[u8]>,
770 ) -> HummockResult<()> {
771 if self.should_skip_block(iter.table_id()) {
772 iter.finish_block();
773 return Ok(());
774 }
775
776 while iter.is_valid() && iter.key().le(&target_key) {
777 let is_new_user_key =
778 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
779 self.compaction_statistics.iter_total_key_counts += 1;
780 self.may_report_process_key(1);
781
782 let mut drop = false;
783 let value = HummockValue::from_slice(iter.value()).unwrap();
784 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
785 if is_first_or_new_user_key {
786 self.last_key.set(iter.key());
787 self.last_key_is_delete = false;
788 }
789
790 if !self.task_config.retain_multiple_version
792 && self.task_config.gc_delete_keys
793 && value.is_delete()
794 {
795 drop = true;
796 self.last_key_is_delete = true;
797 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
798 drop = true;
799 }
800
801 if !drop && self.compaction_filter.should_delete(iter.key()) {
802 drop = true;
803 }
804
805 if !drop && self.watermark_should_delete(&iter.key(), value) {
806 drop = true;
807 self.last_key_is_delete = true;
808 }
809
810 if self.last_table_id != Some(self.last_key.user_key.table_id) {
811 if let Some(last_table_id) = self.last_table_id.take() {
812 self.compaction_statistics
813 .delta_drop_stat
814 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
815 }
816 self.last_table_id = Some(self.last_key.user_key.table_id);
817 }
818
819 if drop {
820 self.compaction_statistics.iter_drop_key_counts += 1;
821
822 self.last_table_stats.total_key_count -= 1;
823 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
824 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
825 iter.next();
826 continue;
827 }
828 self.builder
829 .add_full_key(iter.key(), value, is_new_user_key)
830 .await?;
831 iter.next();
832 }
833 Ok(())
834 }
835
836 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
837 if self.should_skip_block(smallest_key.user_key.table_id) {
838 return false;
840 }
841
842 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
843 return false;
846 }
847
848 if self.watermark_may_delete(smallest_key) {
849 return false;
850 }
851
852 if self.compaction_filter.should_delete(*smallest_key) {
854 return false;
855 }
856
857 true
858 }
859
860 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
861 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
863 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
864 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
865 let unused = vec![];
866 let unused_put = HummockValue::Put(unused.as_slice());
867 if (pk_prefix_has_watermark
868 && self
869 .pk_prefix_skip_watermark_state
870 .should_delete(key, unused_put))
871 || (non_pk_prefix_has_watermark
872 && self
873 .non_pk_prefix_skip_watermark_state
874 .should_delete(key, unused_put))
875 {
876 return true;
877 }
878 }
879 self.value_skip_watermark_state.has_watermark()
880 && self.value_skip_watermark_state.may_delete(key)
881 }
882
883 fn watermark_should_delete(
884 &mut self,
885 key: &FullKey<&[u8]>,
886 value: HummockValue<&[u8]>,
887 ) -> bool {
888 (self.pk_prefix_skip_watermark_state.has_watermark()
889 && self
890 .pk_prefix_skip_watermark_state
891 .should_delete(key, value))
892 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
893 && self
894 .non_pk_prefix_skip_watermark_state
895 .should_delete(key, value))
896 || (self.value_skip_watermark_state.has_watermark()
897 && self.value_skip_watermark_state.should_delete(key, value))
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use std::collections::{HashMap, VecDeque};
904 use std::sync::Arc;
905
906 use risingwave_common::catalog::TableId;
907 use risingwave_common::hash::VirtualNode;
908 use risingwave_common::util::epoch::test_epoch;
909 use risingwave_hummock_sdk::compact_task::CompactTask;
910 use risingwave_hummock_sdk::key::FullKey;
911 use risingwave_hummock_sdk::level::InputLevel;
912 use risingwave_pb::hummock::compact_task::TaskType;
913 use risingwave_pb::hummock::{BloomFilterType, LevelType, PbSstableFilterType};
914
915 use super::CompactorRunner;
916 use crate::compaction_catalog_manager::CompactionCatalogAgent;
917 use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
918 use crate::hummock::compactor::task_progress::TaskProgress;
919 use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
920 use crate::hummock::iterator::test_utils::mock_sstable_store;
921 use crate::hummock::test_utils::{
922 default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
923 };
924 use crate::hummock::value::HummockValue;
925 use crate::hummock::{
926 BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
927 };
928 use crate::monitor::CompactorMetrics;
929
930 fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
931 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
932 table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
933 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
934 }
935
936 #[tokio::test]
937 async fn test_fast_compact_skips_empty_table_id_sst() {
938 let sstable_store = mock_sstable_store().await;
939 let table_id_to_vnode = HashMap::from([
940 (1, VirtualNode::COUNT_FOR_TEST),
941 (2, VirtualNode::COUNT_FOR_TEST),
942 ]);
943 let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
944
945 let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
946 default_builder_opt_for_test(),
947 1,
948 (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
949 sstable_store.clone(),
950 CachePolicy::NotFill,
951 table_id_to_vnode.clone(),
952 table_id_to_watermark_serde.clone(),
953 )
954 .await;
955 assert_eq!(dropped_only_sst.bloom_filter_kind, BloomFilterType::Sstable);
956 let mut inner = dropped_only_sst.get_inner();
957 inner.table_ids.clear();
958 dropped_only_sst.set_inner(inner);
959
960 let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
961 default_builder_opt_for_test(),
962 2,
963 (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
964 sstable_store.clone(),
965 CachePolicy::NotFill,
966 table_id_to_vnode.clone(),
967 table_id_to_watermark_serde.clone(),
968 )
969 .await;
970 let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
971 default_builder_opt_for_test(),
972 3,
973 (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
974 sstable_store.clone(),
975 CachePolicy::NotFill,
976 table_id_to_vnode,
977 table_id_to_watermark_serde,
978 )
979 .await;
980
981 let mut storage_opts = default_opts_for_test();
982 storage_opts.enable_fast_compaction = true;
983 storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
984 storage_opts.compactor_fast_max_compact_delete_ratio = 100;
985 let context = CompactorContext::new_local_compact_context(
986 Arc::new(storage_opts),
987 sstable_store,
988 Arc::new(CompactorMetrics::unused()),
989 None,
990 );
991
992 let task = CompactTask {
993 input_ssts: vec![
994 InputLevel {
995 level_idx: 1,
996 level_type: LevelType::Nonoverlapping,
997 table_infos: vec![dropped_only_sst, live_left_sst],
998 },
999 InputLevel {
1000 level_idx: 2,
1001 level_type: LevelType::Nonoverlapping,
1002 table_infos: vec![live_right_sst],
1003 },
1004 ],
1005 task_id: 42,
1006 target_level: 2,
1007 existing_table_ids: vec![TableId::new(2)],
1008 target_file_size: 1 << 20,
1009 task_type: TaskType::Dynamic,
1010 blocked_xor_filter_kv_count_threshold: Some(0),
1011 sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
1012 ..Default::default()
1013 };
1014
1015 assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
1016 assert!(optimize_by_copy_block(&task, &context));
1017
1018 let runner = CompactorRunner::new(
1019 context,
1020 task,
1021 CompactionCatalogAgent::for_test(vec![1, 2]),
1022 SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1023 Arc::new(TaskProgress::default()),
1024 MultiCompactionFilter::default(),
1025 );
1026 runner.run().await.unwrap();
1027 }
1028}