1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::config::meta::default::compaction_config;
25use risingwave_common::constants::hummock::CompactionFilterFlag;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbSstableFilterType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39 ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
40};
41use crate::hummock::iterator::{
42 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
43 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
44 UserIterator,
45};
46use crate::hummock::multi_builder::TableBuilderFactory;
47use crate::hummock::{
48 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
49 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
50};
51use crate::monitor::StoreLocalStatistic;
52
53pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
54 pub object_id_getter: Arc<dyn GetObjectId>,
55 pub limiter: Arc<MemoryLimiter>,
56 pub options: SstableBuilderOptions,
57 pub policy: CachePolicy,
58 pub remote_rpc_cost: Arc<AtomicU64>,
59 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
60 pub sstable_writer_factory: W,
61 pub _phantom: PhantomData<F>,
62}
63
64#[async_trait::async_trait]
65impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
66 type Filter = F;
67 type Writer = W::Writer;
68
69 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
70 let timer = Instant::now();
71 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
72 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
73 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
74 let writer_options = SstableWriterOptions {
75 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
76 tracker: None,
77 policy: self.policy,
78 };
79 let writer = self
80 .sstable_writer_factory
81 .create_sst_writer(table_id, writer_options)
82 .await?;
83 let builder = SstableBuilder::new(
84 table_id,
85 writer,
86 Self::Filter::create(self.options.filter_builder_options()),
87 self.options.clone(),
88 self.compaction_catalog_agent_ref.clone(),
89 Some(self.limiter.clone()),
90 );
91 Ok(builder)
92 }
93}
94
95#[derive(Default, Debug)]
97pub struct CompactionStatistics {
98 pub delta_drop_stat: TableStatsMap,
100
101 pub iter_total_key_counts: u64,
103 pub iter_drop_key_counts: u64,
104}
105
106impl CompactionStatistics {
107 #[expect(dead_code)]
108 fn delete_ratio(&self) -> Option<u64> {
109 if self.iter_total_key_counts == 0 {
110 return None;
111 }
112
113 Some(self.iter_drop_key_counts / self.iter_total_key_counts)
114 }
115}
116
117#[derive(Clone, Default)]
118pub struct TaskConfig {
119 pub(crate) key_range: KeyRange,
120 pub(crate) cache_policy: CachePolicy,
121 pub(crate) gc_delete_keys: bool,
122 pub(crate) retain_multiple_version: bool,
123 pub(crate) use_block_based_filter: bool,
124 pub(crate) sstable_filter_kind: PbSstableFilterType,
125
126 pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
127 pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
131 pub(crate) disable_drop_column_optimization: bool,
133}
134
135impl TaskConfig {
136 #[cfg(any(test, feature = "test"))]
137 pub fn for_test(
138 key_range: KeyRange,
139 cache_policy: CachePolicy,
140 gc_delete_keys: bool,
141 use_block_based_filter: bool,
142 table_schemas: HashMap<TableId, PbTableSchema>,
143 ) -> Self {
144 Self {
145 key_range,
146 cache_policy,
147 gc_delete_keys,
148 retain_multiple_version: false,
149 use_block_based_filter,
150 sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
151 table_vnode_partition: BTreeMap::default(),
152 table_schemas,
153 disable_drop_column_optimization: false,
154 }
155 }
156
157 #[cfg(any(test, feature = "test"))]
158 pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
159 self.disable_drop_column_optimization = disable;
160 self
161 }
162}
163
164pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
165 let mut multi_filter = MultiCompactionFilter::default();
166 let compaction_filter_flag =
167 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
168 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
172 let id_to_ttl = compact_task
173 .table_options
174 .iter()
175 .filter_map(|(id, option)| {
176 option
177 .retention_seconds
178 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
179 })
180 .collect();
181
182 let ttl_filter = Box::new(TtlCompactionFilter::new(
183 id_to_ttl,
184 compact_task.current_epoch_time,
185 ));
186 multi_filter.register(ttl_filter);
187 }
188
189 multi_filter
190}
191
192fn generate_splits_fast(
193 sstable_infos: &[&SstableInfo],
194 compaction_size: u64,
195 context: &CompactorContext,
196 max_sub_compaction: u32,
197) -> Vec<KeyRange> {
198 let worker_num = context.compaction_executor.worker_num();
199 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
200
201 let parallelism = calculate_task_parallelism_impl(
202 worker_num,
203 parallel_compact_size,
204 compaction_size,
205 max_sub_compaction,
206 );
207 let mut indexes = vec![];
208 for sst in sstable_infos {
209 let key_range = &sst.key_range;
210 indexes.push(
211 FullKey {
212 user_key: FullKey::decode(&key_range.left).user_key,
213 epoch_with_gap: EpochWithGap::new_max_epoch(),
214 }
215 .encode(),
216 );
217 indexes.push(
218 FullKey {
219 user_key: FullKey::decode(&key_range.right).user_key,
220 epoch_with_gap: EpochWithGap::new_max_epoch(),
221 }
222 .encode(),
223 );
224 }
225 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
226 indexes.dedup();
227 if indexes.len() <= parallelism {
228 return vec![];
229 }
230
231 let mut splits = vec![];
232 splits.push(KeyRange::default());
233 let parallel_key_count = indexes.len() / parallelism;
234 let mut last_split_key_count = 0;
235 for key in indexes {
236 if last_split_key_count >= parallel_key_count {
237 splits.last_mut().unwrap().right = Bytes::from(key.clone());
238 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
239 last_split_key_count = 0;
240 }
241 last_split_key_count += 1;
242 }
243
244 splits
245}
246
247pub async fn generate_splits(
248 sstable_infos: &[&SstableInfo],
249 compaction_size: u64,
250 context: &CompactorContext,
251 max_sub_compaction: u32,
252) -> HummockResult<Vec<KeyRange>> {
253 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
254 if compaction_size > parallel_compact_size {
255 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
256 return Ok(generate_splits_fast(
257 sstable_infos,
258 compaction_size,
259 context,
260 max_sub_compaction,
261 ));
262 }
263 let mut indexes = vec![];
264 for sstable_info in sstable_infos {
266 let sstable = context
267 .sstable_store
268 .sstable(sstable_info, &mut StoreLocalStatistic::default())
269 .await?;
270 indexes.extend(sstable.meta.block_metas.iter().map(|block| {
271 let data_size = block.len;
272 let full_key = FullKey {
273 user_key: FullKey::decode(&block.smallest_key).user_key,
274 epoch_with_gap: EpochWithGap::new_max_epoch(),
275 }
276 .encode();
277 (data_size as u64, full_key)
278 }));
279 }
280 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
282 let mut splits = vec![];
283 splits.push(KeyRange::default());
284
285 let parallelism = calculate_task_parallelism_impl(
286 context.compaction_executor.worker_num(),
287 parallel_compact_size,
288 compaction_size,
289 max_sub_compaction,
290 );
291
292 let sub_compaction_data_size =
293 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
294
295 if parallelism > 1 {
296 let mut last_buffer_size = 0;
297 let mut last_key: Vec<u8> = vec![];
298 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
299 for (data_size, key) in indexes {
300 if last_buffer_size >= sub_compaction_data_size
301 && !last_key.eq(&key)
302 && remaining_size > parallel_compact_size
303 {
304 splits.last_mut().unwrap().right = Bytes::from(key.clone());
305 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
306 last_buffer_size = data_size;
307 } else {
308 last_buffer_size += data_size;
309 }
310 remaining_size -= data_size;
311 last_key = key;
312 }
313 return Ok(splits);
314 }
315 }
316
317 Ok(vec![])
318}
319
320pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
321 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
322 let total_input_uncompressed_file_size = task
323 .read_input_ssts()
324 .map(|table| table.uncompressed_file_size)
325 .sum::<u64>();
326
327 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
328 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
329}
330
331pub fn estimate_output_key_count_by_size(
332 total_key_count: u64,
333 total_size: u64,
334 output_capacity: usize,
335) -> usize {
336 if total_key_count == 0 {
337 return 0;
338 }
339 if total_size == 0 {
340 return total_key_count.try_into().unwrap_or(usize::MAX);
341 }
342 if output_capacity == 0 {
343 return 0;
344 }
345
346 let estimated = (total_key_count as u128 * output_capacity as u128)
350 .div_ceil(total_size as u128)
351 .min(total_key_count as u128);
352 estimated.try_into().unwrap_or(usize::MAX)
353}
354
355pub fn estimate_output_key_count_for_input_ssts<'a>(
356 input_ssts: impl Iterator<Item = &'a SstableInfo>,
357 output_capacity: usize,
358) -> usize {
359 let (total_key_count, total_uncompressed_size) =
360 input_ssts.fold((0u64, 0u64), |(key_count, size), sst| {
361 (
362 key_count + sst.total_key_count,
363 size + sst.uncompressed_file_size,
364 )
365 });
366
367 estimate_output_key_count_by_size(total_key_count, total_uncompressed_size, output_capacity)
368}
369
370pub fn estimate_output_key_count_for_task(task: &CompactTask, output_capacity: usize) -> usize {
371 estimate_output_key_count_for_input_ssts(task.read_input_ssts(), output_capacity)
372}
373
374pub fn blocked_xor_filter_key_count_threshold(
375 blocked_xor_filter_kv_count_threshold: Option<u64>,
376) -> usize {
377 blocked_xor_filter_kv_count_threshold
378 .unwrap_or(compaction_config::DEFAULT_BLOCKED_XOR_FILTER_KV_COUNT_THRESHOLD)
379 .try_into()
380 .unwrap_or(usize::MAX)
381}
382
383pub async fn check_compaction_result(
385 compact_task: &CompactTask,
386 context: CompactorContext,
387 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
388) -> HummockResult<bool> {
389 if compact_task.contains_ttl() {
391 return Ok(true);
392 }
393
394 let mut table_iters = Vec::new();
395
396 for level in &compact_task.input_ssts {
397 if level.level_type == PbLevelType::Nonoverlapping {
398 let tables = level.read_sstable_infos().cloned().collect_vec();
399 if tables.is_empty() {
400 continue;
401 }
402 debug_assert!(can_concat(&tables));
403
404 table_iters.push(ConcatSstableIterator::new(
405 tables,
406 KeyRange::inf(),
407 context.sstable_store.clone(),
408 Arc::new(TaskProgress::default()),
409 context.storage_opts.compactor_iter_max_io_retry_times,
410 ));
411 } else {
412 for table_info in level.read_sstable_infos().cloned() {
413 table_iters.push(ConcatSstableIterator::new(
414 vec![table_info],
415 KeyRange::inf(),
416 context.sstable_store.clone(),
417 Arc::new(TaskProgress::default()),
418 context.storage_opts.compactor_iter_max_io_retry_times,
419 ));
420 }
421 }
422 }
423
424 let iter = MergeIterator::for_compactor(table_iters);
425 let left_iter = {
426 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
427 iter,
428 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
429 compact_task.pk_prefix_table_watermarks.clone(),
430 ),
431 );
432
433 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
434 skip_watermark_iter,
435 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436 compact_task.non_pk_prefix_table_watermarks.clone(),
437 compaction_catalog_agent_ref.clone(),
438 ),
439 );
440
441 UserIterator::new(
442 combine_iter,
443 (Bound::Unbounded, Bound::Unbounded),
444 u64::MAX,
445 0,
446 None,
447 )
448 };
449 let iter = ConcatSstableIterator::new(
450 compact_task.sorted_output_ssts.clone(),
451 KeyRange::inf(),
452 context.sstable_store.clone(),
453 Arc::new(TaskProgress::default()),
454 context.storage_opts.compactor_iter_max_io_retry_times,
455 );
456 let right_iter = {
457 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
458 iter,
459 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
460 compact_task.pk_prefix_table_watermarks.clone(),
461 ),
462 );
463
464 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
465 skip_watermark_iter,
466 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
467 compact_task.non_pk_prefix_table_watermarks.clone(),
468 compaction_catalog_agent_ref,
469 ),
470 );
471
472 UserIterator::new(
473 combine_iter,
474 (Bound::Unbounded, Bound::Unbounded),
475 u64::MAX,
476 0,
477 None,
478 )
479 };
480
481 check_result(left_iter, right_iter).await
482}
483
484pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
485 left_iter: UserIterator<I>,
486 sort_ssts: Vec<SstableInfo>,
487 context: CompactorContext,
488) -> HummockResult<bool> {
489 let iter = ConcatSstableIterator::new(
490 sort_ssts,
491 KeyRange::inf(),
492 context.sstable_store.clone(),
493 Arc::new(TaskProgress::default()),
494 0,
495 );
496 let right_iter = UserIterator::new(
497 iter,
498 (Bound::Unbounded, Bound::Unbounded),
499 u64::MAX,
500 0,
501 None,
502 );
503 check_result(left_iter, right_iter).await
504}
505
506async fn check_result<
507 I1: HummockIterator<Direction = Forward>,
508 I2: HummockIterator<Direction = Forward>,
509>(
510 mut left_iter: UserIterator<I1>,
511 mut right_iter: UserIterator<I2>,
512) -> HummockResult<bool> {
513 left_iter.rewind().await?;
514 right_iter.rewind().await?;
515 let mut right_count = 0;
516 let mut left_count = 0;
517 while left_iter.is_valid() && right_iter.is_valid() {
518 if left_iter.key() != right_iter.key() {
519 tracing::error!(
520 "The key of input and output not equal. key: {:?} vs {:?}",
521 left_iter.key(),
522 right_iter.key()
523 );
524 return Ok(false);
525 }
526 if left_iter.value() != right_iter.value() {
527 tracing::error!(
528 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
529 left_iter.key(),
530 left_iter.value(),
531 right_iter.value()
532 );
533 return Ok(false);
534 }
535 left_iter.next().await?;
536 right_iter.next().await?;
537 left_count += 1;
538 right_count += 1;
539 }
540 while left_iter.is_valid() {
541 left_count += 1;
542 left_iter.next().await?;
543 }
544 while right_iter.is_valid() {
545 right_count += 1;
546 right_iter.next().await?;
547 }
548 if left_count != right_count {
549 tracing::error!(
550 "The key count of input and output not equal: {} vs {}",
551 left_count,
552 right_count
553 );
554 return Ok(false);
555 }
556 Ok(true)
557}
558
559pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
560 let input_ssts = compact_task.read_input_ssts().collect_vec();
561 let compaction_size = input_ssts_size(&input_ssts);
562 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
563}
564
565fn optimize_by_copy_block_with_input(
566 compact_task: &CompactTask,
567 context: &CompactorContext,
568 input_ssts: &[&SstableInfo],
569 compaction_size: u64,
570) -> bool {
571 let all_ssts_are_blocked_filter = input_ssts
572 .iter()
573 .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
574 let current_filter_type = compact_task.sstable_filter_kind;
575 let all_ssts_match_filter_family = input_ssts
576 .iter()
577 .all(|table_info| table_info.filter_type_compatible_with(current_filter_type));
578 let output_capacity = estimate_task_output_capacity(context.clone(), compact_task);
583 let estimated_output_key_count =
584 estimate_output_key_count_for_input_ssts(input_ssts.iter().copied(), output_capacity);
585 let output_wants_blocked_filter =
586 compact_task.should_use_block_based_filter_for_output(estimated_output_key_count as u64);
587
588 let delete_key_count = input_ssts
589 .iter()
590 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
591 .sum::<u64>();
592 let total_key_count = input_ssts
593 .iter()
594 .map(|table_info| table_info.total_key_count)
595 .sum::<u64>();
596
597 let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
598 context.storage_opts.enable_fast_compaction
599 && current_filter_type == PbSstableFilterType::SstableFilterXor16
600 && all_ssts_are_blocked_filter
601 && all_ssts_match_filter_family
602 && output_wants_blocked_filter
603 && !compact_task.contains_range_tombstone()
604 && !compact_task.contains_ttl()
605 && !compact_task.contains_split_sst()
606 && single_table
607 && compact_task.target_level > 0
608 && compact_task.input_ssts.len() == 2
609 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
610 && delete_key_count * 100
611 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
612 && compact_task.task_type == PbTaskType::Dynamic
613}
614
615pub async fn generate_splits_for_task(
616 compact_task: &mut CompactTask,
617 context: &CompactorContext,
618 optimize_by_copy_block: bool,
619) -> HummockResult<()> {
620 let input_ssts = compact_task.read_input_ssts().collect_vec();
621 let compaction_size = input_ssts_size(&input_ssts);
622
623 if !optimize_by_copy_block {
624 let splits = generate_splits(
625 &input_ssts,
626 compaction_size,
627 context,
628 compact_task.max_sub_compaction,
629 )
630 .await?;
631 if !splits.is_empty() {
632 compact_task.splits = splits;
633 }
634 return Ok(());
635 }
636
637 Ok(())
638}
639
640pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
641 let group_label = compact_task.compaction_group_id.to_string();
642 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
643
644 let (select_size, select_count) = read_sstable_size_and_count(
645 compact_task
646 .input_ssts
647 .iter()
648 .filter(|level| level.level_idx != compact_task.target_level)
649 .flat_map(|level| level.read_sstable_infos()),
650 );
651 let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
652 compact_task
653 .input_ssts
654 .iter()
655 .filter(|level| level.level_idx == compact_task.target_level)
656 .flat_map(|level| level.read_sstable_infos()),
657 );
658
659 context
660 .compactor_metrics
661 .compact_read_current_level
662 .with_label_values(&[&group_label, &cur_level_label])
663 .inc_by(select_size);
664 context
665 .compactor_metrics
666 .compact_read_sstn_current_level
667 .with_label_values(&[&group_label, &cur_level_label])
668 .inc_by(select_count as u64);
669
670 let next_level_label = compact_task.target_level.to_string();
671 context
672 .compactor_metrics
673 .compact_read_next_level
674 .with_label_values(&[&group_label, &next_level_label])
675 .inc_by(target_level_read_bytes);
676 context
677 .compactor_metrics
678 .compact_read_sstn_next_level
679 .with_label_values(&[&group_label, &next_level_label])
680 .inc_by(target_count as u64);
681}
682
683fn read_sstable_size_and_count<'a>(
684 sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
685) -> (u64, usize) {
686 sstable_infos
687 .into_iter()
688 .fold((0, 0), |(size, count), table_info| {
689 (size + table_info.sst_size, count + 1)
690 })
691}
692
693pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
694 let input_ssts = compact_task.read_input_ssts().collect_vec();
695 let compaction_size = input_ssts_size(&input_ssts);
696 let optimize_by_copy_block =
697 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
698
699 if optimize_by_copy_block {
700 return 1;
701 }
702
703 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
704 calculate_task_parallelism_impl(
705 context.compaction_executor.worker_num(),
706 parallel_compact_size,
707 compaction_size,
708 compact_task.max_sub_compaction,
709 )
710}
711
712fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
713 input_ssts
714 .iter()
715 .map(|table_info| table_info.sst_size)
716 .sum()
717}
718
719pub fn calculate_task_parallelism_impl(
720 worker_num: usize,
721 parallel_compact_size: u64,
722 compaction_size: u64,
723 max_sub_compaction: u32,
724) -> usize {
725 let parallelism = compaction_size.div_ceil(parallel_compact_size);
726 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
727}
728
729#[cfg(test)]
730mod tests {
731 use std::sync::Arc;
732
733 use risingwave_common::catalog::TableId;
734 use risingwave_hummock_sdk::level::InputLevel;
735 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
736 use risingwave_pb::hummock::compact_task::PbTaskType;
737 use risingwave_pb::hummock::{
738 PbBloomFilterType, PbLevelType, PbSstableFilterLayout, PbSstableFilterType,
739 };
740
741 use super::{
742 CompactTask, CompactorContext, estimate_output_key_count_by_size, optimize_by_copy_block,
743 };
744 use crate::hummock::compactor::new_compaction_await_tree_reg_ref;
745 use crate::hummock::iterator::test_utils::mock_sstable_store;
746 use crate::monitor::CompactorMetrics;
747 use crate::opts::StorageOpts;
748
749 fn test_sstable(
750 table_id: TableId,
751 total_key_count: u64,
752 ) -> risingwave_hummock_sdk::sstable_info::SstableInfo {
753 SstableInfoInner {
754 object_id: 1.into(),
755 sst_id: 1.into(),
756 table_ids: vec![table_id],
757 total_key_count,
758 sst_size: 1024,
759 uncompressed_file_size: 1024,
760 bloom_filter_kind: PbBloomFilterType::Blocked,
761 filter_type: PbSstableFilterType::SstableFilterXor16,
762 ..Default::default()
763 }
764 .into()
765 }
766
767 async fn test_context() -> CompactorContext {
768 CompactorContext::new_local_compact_context(
769 Arc::new(StorageOpts::default()),
770 mock_sstable_store().await,
771 Arc::new(CompactorMetrics::unused()),
772 Some(new_compaction_await_tree_reg_ref(
773 await_tree::Config::default(),
774 )),
775 )
776 }
777
778 fn test_compact_task(
779 layout: PbSstableFilterLayout,
780 blocked_xor_filter_kv_count_threshold: Option<u64>,
781 ) -> CompactTask {
782 let table_id = TableId::new(1);
783 CompactTask {
784 input_ssts: vec![
785 InputLevel {
786 level_idx: 1,
787 level_type: PbLevelType::Nonoverlapping,
788 table_infos: vec![test_sstable(table_id, 10)],
789 },
790 InputLevel {
791 level_idx: 2,
792 level_type: PbLevelType::Nonoverlapping,
793 table_infos: vec![test_sstable(table_id, 10)],
794 },
795 ],
796 existing_table_ids: vec![table_id],
797 target_level: 2,
798 target_file_size: 1024,
799 task_type: PbTaskType::Dynamic,
800 sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
801 sstable_filter_layout: layout,
802 blocked_xor_filter_kv_count_threshold,
803 ..Default::default()
804 }
805 }
806
807 #[test]
808 fn test_estimate_output_key_count_by_size_scales_to_output_sst() {
809 let estimated_key_count =
810 estimate_output_key_count_by_size(1024 * 1024, 512 * 1024 * 1024, 128 * 1024 * 1024);
811
812 assert_eq!(estimated_key_count, 256 * 1024);
813 assert_eq!(estimate_output_key_count_by_size(100, 0, 0), 100);
814 }
815
816 #[tokio::test]
817 async fn test_optimize_by_copy_block_respects_plain_layout() {
818 let context = test_context().await;
819 let compact_task = test_compact_task(PbSstableFilterLayout::Plain, Some(1));
820
821 assert!(!optimize_by_copy_block(&compact_task, &context));
822 }
823
824 #[tokio::test]
825 async fn test_optimize_by_copy_block_respects_auto_threshold() {
826 let context = test_context().await;
827 let compact_task = test_compact_task(PbSstableFilterLayout::Auto, Some(1024));
828
829 assert!(!optimize_by_copy_block(&compact_task, &context));
830 }
831
832 #[tokio::test]
833 async fn test_optimize_by_copy_block_keeps_blocked_output_when_requested() {
834 let context = test_context().await;
835 let compact_task = test_compact_task(PbSstableFilterLayout::Blocked, Some(1024));
836
837 assert!(optimize_by_copy_block(&compact_task, &context));
838 }
839}