1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::key::FullKey;
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::TableStats;
32use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, can_concat, compact_task_to_string};
33use risingwave_pb::hummock::PbSstableFilterLayout;
34
35use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
36use crate::hummock::block_stream::BlockDataStream;
37use crate::hummock::compactor::compaction_utils::{
38 blocked_xor_filter_key_count_threshold, estimate_output_key_count_for_task,
39};
40use crate::hummock::compactor::task_progress::TaskProgress;
41use crate::hummock::compactor::{
42 CompactionFilter, CompactionStatistics, Compactor, CompactorContext, MultiCompactionFilter,
43 RemoteBuilderFactory, TaskConfig,
44};
45use crate::hummock::iterator::{
46 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkState, SkipWatermarkState,
47 ValueSkipWatermarkState,
48};
49use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
50use crate::hummock::sstable_store::SstableStoreRef;
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53 Block, BlockBuilder, BlockHolder, BlockIterator, BlockMeta, BlockedXor16FilterBuilder,
54 CachePolicy, CompressionAlgorithm, FilterBuilder, GetObjectId, HummockResult,
55 SstableBuilderOptions, TableHolder, UnifiedSstableWriterFactory,
56};
57use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
58
59pub struct BlockStreamIterator {
61 block_stream: Option<BlockDataStream>,
63
64 next_block_index: usize,
65
66 sstable: TableHolder,
68 iter: Option<BlockIterator>,
69 task_progress: Arc<TaskProgress>,
70
71 sstable_store: SstableStoreRef,
73 sstable_info: SstableInfo,
74 io_retry_times: usize,
75 max_io_retry_times: usize,
76 stats_ptr: Arc<AtomicU64>,
77}
78
79impl BlockStreamIterator {
80 pub fn new(
95 sstable: TableHolder,
96 task_progress: Arc<TaskProgress>,
97 sstable_store: SstableStoreRef,
98 sstable_info: SstableInfo,
99 max_io_retry_times: usize,
100 stats_ptr: Arc<AtomicU64>,
101 ) -> Self {
102 Self {
103 block_stream: None,
104 next_block_index: 0,
105 sstable,
106 iter: None,
107 task_progress,
108 sstable_store,
109 sstable_info,
110 io_retry_times: 0,
111 max_io_retry_times,
112 stats_ptr,
113 }
114 }
115
116 async fn create_stream(&mut self) -> HummockResult<()> {
117 let block_stream = self
120 .sstable_store
121 .get_stream_for_blocks(
122 self.sstable_info.object_id,
123 &self.sstable.meta.block_metas[self.next_block_index..],
124 )
125 .instrument_await("stream_iter_get_stream".verbose())
126 .await?;
127 self.block_stream = Some(block_stream);
128 Ok(())
129 }
130
131 pub(crate) async fn download_next_block(
133 &mut self,
134 ) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
135 let now = Instant::now();
136 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
137 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
138 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
139 });
140 loop {
141 let ret = match &mut self.block_stream {
142 Some(block_stream) => block_stream.next_block_impl().await,
143 None => {
144 self.create_stream().await?;
145 continue;
146 }
147 };
148 match ret {
149 Ok(Some((data, _))) => {
150 let meta = self.sstable.meta.block_metas[self.next_block_index].clone();
151 let filter_block = self
152 .sstable
153 .filter_reader
154 .get_block_raw_filter(self.next_block_index);
155 self.next_block_index += 1;
156 return Ok(Some((data, filter_block, meta)));
157 }
158
159 Ok(None) => break,
160
161 Err(e) => {
162 if !e.is_object_error() || self.io_retry_times >= self.max_io_retry_times {
163 return Err(e);
164 }
165
166 self.block_stream.take();
167 self.io_retry_times += 1;
168 fail_point!("create_stream_err");
169
170 tracing::warn!(
171 "fast compact retry create stream for sstable {} times, sstinfo={}",
172 self.io_retry_times,
173 format!(
174 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
175 self.sstable_info.object_id,
176 self.sstable_info.sst_id,
177 self.sstable_info.meta_offset,
178 self.sstable_info.table_ids
179 )
180 );
181 }
182 }
183 }
184
185 self.next_block_index = self.sstable.meta.block_metas.len();
186 self.iter.take();
187 Ok(None)
188 }
189
190 pub(crate) fn init_block_iter(
191 &mut self,
192 buf: Bytes,
193 uncompressed_capacity: usize,
194 ) -> HummockResult<()> {
195 let block = Block::decode(buf, uncompressed_capacity)?;
196 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
197 iter.seek_to_first();
198 self.iter = Some(iter);
199 Ok(())
200 }
201
202 fn next_block_smallest(&self) -> &[u8] {
203 self.sstable.meta.block_metas[self.next_block_index]
204 .smallest_key
205 .as_ref()
206 }
207
208 fn next_block_largest(&self) -> &[u8] {
209 if self.next_block_index + 1 < self.sstable.meta.block_metas.len() {
210 self.sstable.meta.block_metas[self.next_block_index + 1]
211 .smallest_key
212 .as_ref()
213 } else {
214 self.sstable.meta.largest_key.as_ref()
215 }
216 }
217
218 fn current_block_largest(&self) -> Vec<u8> {
219 if self.next_block_index < self.sstable.meta.block_metas.len() {
220 let mut largest_key = FullKey::decode(
221 self.sstable.meta.block_metas[self.next_block_index]
222 .smallest_key
223 .as_ref(),
224 );
225 largest_key.epoch_with_gap = EpochWithGap::new_max_epoch();
227 largest_key.encode()
228 } else {
229 self.sstable.meta.largest_key.clone()
230 }
231 }
232
233 fn key(&self) -> FullKey<&[u8]> {
234 match self.iter.as_ref() {
235 Some(iter) => iter.key(),
236 None => FullKey::decode(
237 self.sstable.meta.block_metas[self.next_block_index]
238 .smallest_key
239 .as_ref(),
240 ),
241 }
242 }
243
244 pub(crate) fn is_valid(&self) -> bool {
245 self.iter.is_some() || self.next_block_index < self.sstable.meta.block_metas.len()
246 }
247
248 #[cfg(test)]
249 #[cfg(feature = "failpoints")]
250 pub(crate) fn iter_mut(&mut self) -> &mut BlockIterator {
251 self.iter.as_mut().unwrap()
252 }
253}
254
255impl Drop for BlockStreamIterator {
256 fn drop(&mut self) {
257 self.task_progress.dec_num_pending_read_io();
258 }
259}
260
261pub struct ConcatSstableIterator {
264 sstable_iter: Option<BlockStreamIterator>,
266
267 cur_idx: usize,
269
270 sstables: Vec<SstableInfo>,
272
273 sstable_store: SstableStoreRef,
274
275 stats: StoreLocalStatistic,
276 task_progress: Arc<TaskProgress>,
277
278 max_io_retry_times: usize,
279}
280
281impl ConcatSstableIterator {
282 pub fn new(
286 sst_infos: Vec<SstableInfo>,
287 sstable_store: SstableStoreRef,
288 task_progress: Arc<TaskProgress>,
289 max_io_retry_times: usize,
290 ) -> Self {
291 Self {
292 sstable_iter: None,
293 cur_idx: 0,
294 sstables: sst_infos,
295 sstable_store,
296 task_progress,
297 stats: StoreLocalStatistic::default(),
298 max_io_retry_times,
299 }
300 }
301
302 pub async fn rewind(&mut self) -> HummockResult<()> {
303 self.seek_idx(0).await
304 }
305
306 pub async fn next_sstable(&mut self) -> HummockResult<()> {
307 self.seek_idx(self.cur_idx + 1).await
308 }
309
310 pub fn current_sstable(&mut self) -> &mut BlockStreamIterator {
311 self.sstable_iter.as_mut().unwrap()
312 }
313
314 pub async fn init_block_iter(&mut self) -> HummockResult<()> {
315 if let Some(sstable) = self.sstable_iter.as_mut() {
316 if sstable.iter.is_some() {
317 return Ok(());
318 }
319 let (buf, _, meta) = sstable.download_next_block().await?.unwrap();
320 sstable.init_block_iter(buf, meta.uncompressed_size as usize)?;
321 }
322 Ok(())
323 }
324
325 pub fn is_valid(&self) -> bool {
326 self.cur_idx < self.sstables.len()
327 }
328
329 async fn seek_idx(&mut self, idx: usize) -> HummockResult<()> {
331 self.sstable_iter.take();
332 self.cur_idx = idx;
333 if self.cur_idx < self.sstables.len() {
334 let sstable_info = &self.sstables[self.cur_idx];
335 let sstable = self
336 .sstable_store
337 .sstable(sstable_info, &mut self.stats)
338 .instrument_await("stream_iter_sstable".verbose())
339 .await?;
340 self.task_progress.inc_num_pending_read_io();
341
342 let sstable_iter = BlockStreamIterator::new(
343 sstable,
344 self.task_progress.clone(),
345 self.sstable_store.clone(),
346 sstable_info.clone(),
347 self.max_io_retry_times,
348 self.stats.remote_io_time.clone(),
349 );
350 self.sstable_iter = Some(sstable_iter);
351 }
352 Ok(())
353 }
354}
355
356pub struct CompactorRunner<
357 B: FilterBuilder = BlockedXor16FilterBuilder,
358 C: CompactionFilter = MultiCompactionFilter,
359> {
360 left: Box<ConcatSstableIterator>,
361 right: Box<ConcatSstableIterator>,
362 task_id: u64,
363 executor: CompactTaskExecutor<RemoteBuilderFactory<UnifiedSstableWriterFactory, B>, C>,
364 compression_algorithm: CompressionAlgorithm,
365 metrics: Arc<CompactorMetrics>,
366}
367
368impl<B: FilterBuilder, C: CompactionFilter> CompactorRunner<B, C> {
369 pub fn new(
370 context: CompactorContext,
371 task: CompactTask,
372 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
373 object_id_getter: Arc<dyn GetObjectId>,
374 task_progress: Arc<TaskProgress>,
375 compaction_filter: C,
376 ) -> Self {
377 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
378 let compression_algorithm: CompressionAlgorithm = task.compression_algorithm.into();
379 options.compression_algorithm = compression_algorithm;
380 options.capacity = task.target_file_size as usize;
381 let estimated_output_key_count =
382 estimate_output_key_count_for_task(&task, options.capacity);
383 options.estimated_output_key_count = Some(estimated_output_key_count);
384 options.filter_hash_prealloc_key_count_cap =
385 blocked_xor_filter_key_count_threshold(task.blocked_xor_filter_kv_count_threshold);
386 options.max_vnode_key_range_bytes = None;
388 let get_id_time = Arc::new(AtomicU64::new(0));
389
390 let key_range = KeyRange::inf();
391 let read_table_ids = HashSet::from_iter(task.get_table_ids_from_input_ssts());
392
393 let task_config = TaskConfig {
394 key_range,
395 cache_policy: CachePolicy::NotFill,
396 gc_delete_keys: task.gc_delete_keys,
397 retain_multiple_version: false,
398 table_vnode_partition: task.table_vnode_partition.clone(),
399 sstable_filter_layout: PbSstableFilterLayout::Blocked,
400 sstable_filter_type: task.sstable_filter_type,
401 table_schemas: Default::default(),
402 disable_drop_column_optimization: false,
403 };
404 let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());
405
406 let builder_factory = RemoteBuilderFactory::<_, B> {
407 object_id_getter,
408 limiter: context.memory_limiter.clone(),
409 options,
410 policy: task_config.cache_policy,
411 remote_rpc_cost: get_id_time,
412 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
413 sstable_writer_factory: factory,
414 _phantom: PhantomData,
415 };
416 let sst_builder = CapacitySplitTableBuilder::new(
417 builder_factory,
418 context.compactor_metrics.clone(),
419 Some(task_progress.clone()),
420 task_config.table_vnode_partition.clone(),
421 context
422 .storage_opts
423 .compactor_concurrent_uploading_sst_count,
424 compaction_catalog_agent_ref.clone(),
425 );
426 assert_eq!(
427 task.input_ssts.len(),
428 2,
429 "TaskId {} target_level {:?} task {:?}",
430 task.task_id,
431 task.target_level,
432 compact_task_to_string(&task)
433 );
434 let left_ssts = task.input_ssts[0]
435 .read_sstable_infos()
436 .cloned()
437 .collect_vec();
438 let right_ssts = task.input_ssts[1]
439 .read_sstable_infos()
440 .cloned()
441 .collect_vec();
442 assert!(
443 left_ssts
444 .iter()
445 .chain(right_ssts.iter())
446 .all(|sst| sst.filter_layout == PbSstableFilterLayout::Blocked),
447 "fast compaction requires blocked-filter SSTs: {}",
448 compact_task_to_string(&task)
449 );
450 let left = Box::new(ConcatSstableIterator::new(
451 left_ssts,
452 context.sstable_store.clone(),
453 task_progress.clone(),
454 context.storage_opts.compactor_iter_max_io_retry_times,
455 ));
456 let right = Box::new(ConcatSstableIterator::new(
457 right_ssts,
458 context.sstable_store,
459 task_progress.clone(),
460 context.storage_opts.compactor_iter_max_io_retry_times,
461 ));
462
463 let pk_prefix_state = PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
465 task.pk_prefix_table_watermarks.clone(),
466 );
467 let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
468 task.non_pk_prefix_table_watermarks.clone(),
469 compaction_catalog_agent_ref.clone(),
470 );
471 let value_skip_watermark_state = ValueSkipWatermarkState::from_safe_epoch_watermarks(
472 task.value_table_watermarks.clone(),
473 compaction_catalog_agent_ref,
474 );
475
476 Self {
477 executor: CompactTaskExecutor::new(
478 sst_builder,
479 task_config,
480 task_progress,
481 pk_prefix_state,
482 non_pk_prefix_state,
483 value_skip_watermark_state,
484 compaction_filter,
485 read_table_ids,
486 ),
487 left,
488 right,
489 task_id: task.task_id,
490 metrics: context.compactor_metrics,
491 compression_algorithm,
492 }
493 }
494
495 pub async fn run(mut self) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
496 self.left.rewind().await?;
497 self.right.rewind().await?;
498 let mut skip_raw_block_count = 0;
499 let mut skip_raw_block_size = 0;
500 while self.left.is_valid() && self.right.is_valid() {
501 let ret = self
502 .left
503 .current_sstable()
504 .key()
505 .cmp(&self.right.current_sstable().key());
506 let (first, second) = if ret == Ordering::Less {
507 (&mut self.left, &mut self.right)
508 } else {
509 (&mut self.right, &mut self.left)
510 };
511 assert!(
512 ret != Ordering::Equal,
513 "sst range overlap equal_key {:?}",
514 self.left.current_sstable().key()
515 );
516 if first.current_sstable().iter.is_none() {
517 let right_key = second.current_sstable().key();
518 while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
519 let full_key = FullKey::decode(first.current_sstable().next_block_largest());
520 if full_key.user_key.ge(&right_key.user_key) {
523 break;
524 }
525 let smallest_key =
526 FullKey::decode(first.current_sstable().next_block_smallest());
527 if !self.executor.shall_copy_raw_block(&smallest_key) {
528 break;
529 }
530 let smallest_key = smallest_key.to_vec();
531
532 let (mut block, filter_data, mut meta) = first
533 .current_sstable()
534 .download_next_block()
535 .await?
536 .unwrap();
537 let algorithm = Block::get_algorithm(&block)?;
538 if algorithm == CompressionAlgorithm::None
539 && algorithm != self.compression_algorithm
540 {
541 block = BlockBuilder::compress_block(block, self.compression_algorithm)?;
542 meta.len = block.len() as u32;
543 }
544
545 let largest_key = first.current_sstable().current_block_largest();
546 let block_len = block.len() as u64;
547 let block_key_count = meta.total_key_count;
548
549 if self
550 .executor
551 .builder
552 .add_raw_block(block, filter_data, smallest_key, largest_key, meta)
553 .await?
554 {
555 skip_raw_block_size += block_len;
556 skip_raw_block_count += 1;
557 }
558 self.executor.may_report_process_key(block_key_count);
559 self.executor.clear();
560 }
561 if !first.current_sstable().is_valid() {
562 first.next_sstable().await?;
563 continue;
564 }
565 first.init_block_iter().await?;
566 }
567
568 let target_key = second.current_sstable().key();
569 let iter = first.sstable_iter.as_mut().unwrap().iter.as_mut().unwrap();
570 self.executor.reset_watermark();
571 self.executor.run(iter, target_key).await?;
572 if !iter.is_valid() {
573 first.sstable_iter.as_mut().unwrap().iter.take();
574 if !first.current_sstable().is_valid() {
575 first.next_sstable().await?;
576 }
577 }
578 }
579 let rest_data = if !self.left.is_valid() {
580 &mut self.right
581 } else {
582 &mut self.left
583 };
584 if rest_data.is_valid() {
585 let sstable_iter = rest_data.sstable_iter.as_mut().unwrap();
587 let target_key = FullKey::decode(&sstable_iter.sstable.meta.largest_key);
588 if let Some(iter) = sstable_iter.iter.as_mut() {
589 self.executor.reset_watermark();
590 self.executor.run(iter, target_key).await?;
591 assert!(
592 !iter.is_valid(),
593 "iter should not be valid key {:?}",
594 iter.key()
595 );
596 }
597 sstable_iter.iter.take();
598 }
599
600 while rest_data.is_valid() {
601 let mut sstable_iter = rest_data.sstable_iter.take().unwrap();
602 while sstable_iter.is_valid() {
603 let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
604 let (block, filter_data, block_meta) =
605 sstable_iter.download_next_block().await?.unwrap();
606 let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
608 && self.executor.last_key_is_delete;
609 if self.executor.builder.need_flush()
610 || need_deleted
611 || !self.executor.shall_copy_raw_block(&smallest_key.to_ref())
612 {
613 let largest_key = sstable_iter.sstable.meta.largest_key.clone();
614 let target_key = FullKey::decode(&largest_key);
615 sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
616 let mut iter = sstable_iter.iter.take().unwrap();
617 self.executor.reset_watermark();
618 self.executor.run(&mut iter, target_key).await?;
619 } else {
620 let largest_key = sstable_iter.current_block_largest();
621 let block_len = block.len() as u64;
622 let block_key_count = block_meta.total_key_count;
623 if self
624 .executor
625 .builder
626 .add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
627 .await?
628 {
629 skip_raw_block_count += 1;
630 skip_raw_block_size += block_len;
631 }
632 self.executor.may_report_process_key(block_key_count);
633 self.executor.clear();
634 }
635 }
636 rest_data.next_sstable().await?;
637 }
638 let mut total_read_bytes = 0;
639 for sst in &self.left.sstables {
640 total_read_bytes += sst.sst_size;
641 }
642 for sst in &self.right.sstables {
643 total_read_bytes += sst.sst_size;
644 }
645 self.metrics
646 .compact_fast_runner_bytes
647 .inc_by(skip_raw_block_size);
648 tracing::info!(
649 "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression",
650 skip_raw_block_count,
651 self.task_id,
652 skip_raw_block_size * 100 / total_read_bytes,
653 );
654
655 let statistic = self.executor.take_statistics();
656 let output_ssts = self.executor.builder.finish().await?;
657 Compactor::report_progress(
658 self.metrics.clone(),
659 Some(self.executor.task_progress.clone()),
660 &output_ssts,
661 false,
662 );
663 let sst_infos = output_ssts
664 .iter()
665 .map(|sst| sst.sst_info.clone())
666 .collect_vec();
667 assert!(can_concat(&sst_infos));
668 Ok((output_ssts, statistic))
669 }
670}
671
672pub struct CompactTaskExecutor<F: TableBuilderFactory, C: CompactionFilter> {
673 last_key: FullKey<Vec<u8>>,
674 compaction_statistics: CompactionStatistics,
675 last_table_id: Option<TableId>,
676 last_table_stats: TableStats,
677 builder: CapacitySplitTableBuilder<F>,
678 task_config: TaskConfig,
679 task_progress: Arc<TaskProgress>,
680 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
681 last_key_is_delete: bool,
682 progress_key_num: u32,
683 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
684 value_skip_watermark_state: ValueSkipWatermarkState,
685 compaction_filter: C,
686 read_table_ids: HashSet<TableId>,
687}
688
689impl<F: TableBuilderFactory, C: CompactionFilter> CompactTaskExecutor<F, C> {
690 pub fn new(
691 builder: CapacitySplitTableBuilder<F>,
692 task_config: TaskConfig,
693 task_progress: Arc<TaskProgress>,
694 pk_prefix_skip_watermark_state: PkPrefixSkipWatermarkState,
695 non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState,
696 value_skip_watermark_state: ValueSkipWatermarkState,
697 compaction_filter: C,
698 read_table_ids: HashSet<TableId>,
699 ) -> Self {
700 Self {
701 builder,
702 task_config,
703 last_key: FullKey::default(),
704 last_key_is_delete: false,
705 compaction_statistics: CompactionStatistics::default(),
706 last_table_id: None,
707 last_table_stats: TableStats::default(),
708 task_progress,
709 pk_prefix_skip_watermark_state,
710 progress_key_num: 0,
711 non_pk_prefix_skip_watermark_state,
712 value_skip_watermark_state,
713 compaction_filter,
714 read_table_ids,
715 }
716 }
717
718 fn take_statistics(&mut self) -> CompactionStatistics {
719 if let Some(last_table_id) = self.last_table_id.take() {
720 self.compaction_statistics
721 .delta_drop_stat
722 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
723 }
724 std::mem::take(&mut self.compaction_statistics)
725 }
726
727 fn clear(&mut self) {
728 if !self.last_key.is_empty() {
729 self.last_key = FullKey::default();
730 }
731 self.last_key_is_delete = false;
732 }
733
734 fn reset_watermark(&mut self) {
735 self.pk_prefix_skip_watermark_state.reset_watermark();
736 self.non_pk_prefix_skip_watermark_state.reset_watermark();
737 self.value_skip_watermark_state.reset_watermark();
738 }
739
740 #[inline(always)]
741 fn should_skip_block(&self, table_id: TableId) -> bool {
742 !self.read_table_ids.contains(&table_id)
743 }
744
745 #[inline(always)]
746 fn may_report_process_key(&mut self, key_count: u32) {
747 const PROGRESS_KEY_INTERVAL: u32 = 100;
748 self.progress_key_num += key_count;
749 if self.progress_key_num > PROGRESS_KEY_INTERVAL {
750 self.task_progress
751 .inc_progress_key(self.progress_key_num as u64);
752 self.progress_key_num = 0;
753 }
754 }
755
756 pub async fn run(
757 &mut self,
758 iter: &mut BlockIterator,
759 target_key: FullKey<&[u8]>,
760 ) -> HummockResult<()> {
761 if self.should_skip_block(iter.table_id()) {
762 iter.finish_block();
763 return Ok(());
764 }
765
766 while iter.is_valid() && iter.key().le(&target_key) {
767 let is_new_user_key =
768 !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref();
769 self.compaction_statistics.iter_total_key_counts += 1;
770 self.may_report_process_key(1);
771
772 let mut drop = false;
773 let value = HummockValue::from_slice(iter.value()).unwrap();
774 let is_first_or_new_user_key = is_new_user_key || self.last_key.is_empty();
775 if is_first_or_new_user_key {
776 self.last_key.set(iter.key());
777 self.last_key_is_delete = false;
778 }
779
780 if !self.task_config.retain_multiple_version
782 && self.task_config.gc_delete_keys
783 && value.is_delete()
784 {
785 drop = true;
786 self.last_key_is_delete = true;
787 } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key {
788 drop = true;
789 }
790
791 if !drop && self.compaction_filter.should_delete(iter.key()) {
792 drop = true;
793 }
794
795 if !drop && self.watermark_should_delete(&iter.key(), value) {
796 drop = true;
797 self.last_key_is_delete = true;
798 }
799
800 if self.last_table_id != Some(self.last_key.user_key.table_id) {
801 if let Some(last_table_id) = self.last_table_id.take() {
802 self.compaction_statistics
803 .delta_drop_stat
804 .insert(last_table_id, std::mem::take(&mut self.last_table_stats));
805 }
806 self.last_table_id = Some(self.last_key.user_key.table_id);
807 }
808
809 if drop {
810 self.compaction_statistics.iter_drop_key_counts += 1;
811
812 self.last_table_stats.total_key_count -= 1;
813 self.last_table_stats.total_key_size -= self.last_key.encoded_len() as i64;
814 self.last_table_stats.total_value_size -= value.encoded_len() as i64;
815 iter.next();
816 continue;
817 }
818 self.builder
819 .add_full_key(iter.key(), value, is_new_user_key)
820 .await?;
821 iter.next();
822 }
823 Ok(())
824 }
825
826 pub fn shall_copy_raw_block(&mut self, smallest_key: &FullKey<&[u8]>) -> bool {
827 if self.should_skip_block(smallest_key.user_key.table_id) {
828 return false;
830 }
831
832 if self.last_key_is_delete && self.last_key.user_key.as_ref().eq(&smallest_key.user_key) {
833 return false;
836 }
837
838 if self.watermark_may_delete(smallest_key) {
839 return false;
840 }
841
842 if self.compaction_filter.should_delete(*smallest_key) {
844 return false;
845 }
846
847 true
848 }
849
850 fn watermark_may_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
851 let pk_prefix_has_watermark = self.pk_prefix_skip_watermark_state.has_watermark();
853 let non_pk_prefix_has_watermark = self.non_pk_prefix_skip_watermark_state.has_watermark();
854 if pk_prefix_has_watermark || non_pk_prefix_has_watermark {
855 let unused = vec![];
856 let unused_put = HummockValue::Put(unused.as_slice());
857 if (pk_prefix_has_watermark
858 && self
859 .pk_prefix_skip_watermark_state
860 .should_delete(key, unused_put))
861 || (non_pk_prefix_has_watermark
862 && self
863 .non_pk_prefix_skip_watermark_state
864 .should_delete(key, unused_put))
865 {
866 return true;
867 }
868 }
869 self.value_skip_watermark_state.has_watermark()
870 && self.value_skip_watermark_state.may_delete(key)
871 }
872
873 fn watermark_should_delete(
874 &mut self,
875 key: &FullKey<&[u8]>,
876 value: HummockValue<&[u8]>,
877 ) -> bool {
878 (self.pk_prefix_skip_watermark_state.has_watermark()
879 && self
880 .pk_prefix_skip_watermark_state
881 .should_delete(key, value))
882 || (self.non_pk_prefix_skip_watermark_state.has_watermark()
883 && self
884 .non_pk_prefix_skip_watermark_state
885 .should_delete(key, value))
886 || (self.value_skip_watermark_state.has_watermark()
887 && self.value_skip_watermark_state.should_delete(key, value))
888 }
889}
890
891#[cfg(test)]
892mod tests {
893 use std::collections::{HashMap, VecDeque};
894 use std::sync::Arc;
895
896 use risingwave_common::catalog::TableId;
897 use risingwave_common::hash::VirtualNode;
898 use risingwave_common::util::epoch::test_epoch;
899 use risingwave_hummock_sdk::compact_task::CompactTask;
900 use risingwave_hummock_sdk::key::FullKey;
901 use risingwave_hummock_sdk::level::InputLevel;
902 use risingwave_pb::hummock::compact_task::TaskType;
903 use risingwave_pb::hummock::{LevelType, PbSstableFilterLayout, PbSstableFilterType};
904
905 use super::CompactorRunner;
906 use crate::compaction_catalog_manager::CompactionCatalogAgent;
907 use crate::hummock::compactor::compaction_utils::optimize_by_copy_block;
908 use crate::hummock::compactor::task_progress::TaskProgress;
909 use crate::hummock::compactor::{CompactorContext, MultiCompactionFilter};
910 use crate::hummock::iterator::test_utils::mock_sstable_store;
911 use crate::hummock::test_utils::{
912 default_builder_opt_for_test, default_opts_for_test, gen_test_sstable_impl, test_value_of,
913 };
914 use crate::hummock::value::HummockValue;
915 use crate::hummock::{
916 BlockedXor16FilterBuilder, CachePolicy, SharedComapctorObjectIdManager, Xor16FilterBuilder,
917 };
918 use crate::monitor::CompactorMetrics;
919
920 fn test_key(table_id: u32, idx: usize) -> FullKey<Vec<u8>> {
921 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
922 table_key.extend_from_slice(format!("key_test_{idx:05}").as_bytes());
923 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(1))
924 }
925
926 #[tokio::test]
927 async fn test_fast_compact_skips_empty_table_id_sst() {
928 let sstable_store = mock_sstable_store().await;
929 let table_id_to_vnode = HashMap::from([
930 (1, VirtualNode::COUNT_FOR_TEST),
931 (2, VirtualNode::COUNT_FOR_TEST),
932 ]);
933 let table_id_to_watermark_serde = HashMap::from([(1, None), (2, None)]);
934
935 let mut dropped_only_sst = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
936 default_builder_opt_for_test(),
937 1,
938 (0..2).map(|idx| (test_key(1, idx), HummockValue::put(test_value_of(idx)))),
939 sstable_store.clone(),
940 CachePolicy::NotFill,
941 table_id_to_vnode.clone(),
942 table_id_to_watermark_serde.clone(),
943 )
944 .await;
945 assert_eq!(dropped_only_sst.filter_layout, PbSstableFilterLayout::Plain);
946 let mut inner = dropped_only_sst.get_inner();
947 inner.table_ids.clear();
948 dropped_only_sst.set_inner(inner);
949
950 let live_left_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
951 default_builder_opt_for_test(),
952 2,
953 (0..2).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
954 sstable_store.clone(),
955 CachePolicy::NotFill,
956 table_id_to_vnode.clone(),
957 table_id_to_watermark_serde.clone(),
958 )
959 .await;
960 let live_right_sst = gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
961 default_builder_opt_for_test(),
962 3,
963 (2..4).map(|idx| (test_key(2, idx), HummockValue::put(test_value_of(idx)))),
964 sstable_store.clone(),
965 CachePolicy::NotFill,
966 table_id_to_vnode,
967 table_id_to_watermark_serde,
968 )
969 .await;
970
971 let mut storage_opts = default_opts_for_test();
972 storage_opts.enable_fast_compaction = true;
973 storage_opts.compactor_fast_max_compact_task_size = u64::MAX;
974 storage_opts.compactor_fast_max_compact_delete_ratio = 100;
975 let context = CompactorContext::new_local_compact_context(
976 Arc::new(storage_opts),
977 sstable_store,
978 Arc::new(CompactorMetrics::unused()),
979 None,
980 );
981
982 let task = CompactTask {
983 input_ssts: vec![
984 InputLevel {
985 level_idx: 1,
986 level_type: LevelType::Nonoverlapping,
987 table_infos: vec![dropped_only_sst, live_left_sst],
988 },
989 InputLevel {
990 level_idx: 2,
991 level_type: LevelType::Nonoverlapping,
992 table_infos: vec![live_right_sst],
993 },
994 ],
995 task_id: 42,
996 target_level: 2,
997 existing_table_ids: vec![TableId::new(2)],
998 target_file_size: 1 << 20,
999 task_type: TaskType::Dynamic,
1000 blocked_xor_filter_kv_count_threshold: Some(0),
1001 sstable_filter_type: PbSstableFilterType::SstableFilterXor16,
1002 ..Default::default()
1003 };
1004
1005 assert_eq!(task.input_ssts[0].read_sstable_infos().count(), 1);
1006 assert!(optimize_by_copy_block(&task, &context));
1007
1008 let runner = CompactorRunner::<BlockedXor16FilterBuilder, _>::new(
1009 context,
1010 task,
1011 CompactionCatalogAgent::for_test(vec![1, 2]),
1012 SharedComapctorObjectIdManager::for_test(VecDeque::from([100])),
1013 Arc::new(TaskProgress::default()),
1014 MultiCompactionFilter::default(),
1015 );
1016 runner.run().await.unwrap();
1017 }
1018}