1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::config::meta::default::compaction_config;
25use risingwave_common::constants::hummock::CompactionFilterFlag;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{
34 PbLevelType, PbSstableFilterLayout, PbSstableFilterType, PbTableSchema,
35};
36use tokio::time::Instant;
37
38pub use super::context::CompactorContext;
39use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
40use crate::hummock::compactor::{
41 ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
42};
43use crate::hummock::iterator::{
44 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
45 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
46 UserIterator,
47};
48use crate::hummock::multi_builder::TableBuilderFactory;
49use crate::hummock::{
50 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56 pub object_id_getter: Arc<dyn GetObjectId>,
57 pub limiter: Arc<MemoryLimiter>,
58 pub options: SstableBuilderOptions,
59 pub policy: CachePolicy,
60 pub remote_rpc_cost: Arc<AtomicU64>,
61 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62 pub sstable_writer_factory: W,
63 pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68 type Filter = F;
69 type Writer = W::Writer;
70
71 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72 let timer = Instant::now();
73 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76 let writer_options = SstableWriterOptions {
77 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78 tracker: None,
79 policy: self.policy,
80 };
81 let writer = self
82 .sstable_writer_factory
83 .create_sst_writer(table_id, writer_options)
84 .await?;
85 let builder = SstableBuilder::new(
86 table_id,
87 writer,
88 Self::Filter::create(self.options.filter_builder_options()),
89 self.options.clone(),
90 self.compaction_catalog_agent_ref.clone(),
91 Some(self.limiter.clone()),
92 );
93 Ok(builder)
94 }
95}
96
97#[derive(Default, Debug)]
99pub struct CompactionStatistics {
100 pub delta_drop_stat: TableStatsMap,
102
103 pub iter_total_key_counts: u64,
105 pub iter_drop_key_counts: u64,
106}
107
108#[derive(Clone, Default)]
109pub struct TaskConfig {
110 pub(crate) key_range: KeyRange,
111 pub(crate) cache_policy: CachePolicy,
112 pub(crate) gc_delete_keys: bool,
113 pub(crate) retain_multiple_version: bool,
114 pub(crate) sstable_filter_layout: PbSstableFilterLayout,
116 pub(crate) sstable_filter_type: PbSstableFilterType,
117
118 pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
119 pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
123 pub(crate) disable_drop_column_optimization: bool,
125}
126
127impl TaskConfig {
128 pub fn for_test(
130 key_range: KeyRange,
131 cache_policy: CachePolicy,
132 gc_delete_keys: bool,
133 sstable_filter_layout: PbSstableFilterLayout,
134 table_schemas: HashMap<TableId, PbTableSchema>,
135 ) -> Self {
136 Self {
137 key_range,
138 cache_policy,
139 gc_delete_keys,
140 retain_multiple_version: false,
141 sstable_filter_layout,
142 sstable_filter_type: PbSstableFilterType::SstableFilterXor16,
143 table_vnode_partition: BTreeMap::default(),
144 table_schemas,
145 disable_drop_column_optimization: false,
146 }
147 }
148
149 pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
150 self.disable_drop_column_optimization = disable;
151 self
152 }
153}
154
155pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
156 let mut multi_filter = MultiCompactionFilter::default();
157 let compaction_filter_flag =
158 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
159 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
163 let id_to_ttl = compact_task
164 .table_options
165 .iter()
166 .filter_map(|(id, option)| {
167 option
168 .retention_seconds
169 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
170 })
171 .collect();
172
173 let ttl_filter = Box::new(TtlCompactionFilter::new(
174 id_to_ttl,
175 compact_task.current_epoch_time,
176 ));
177 multi_filter.register(ttl_filter);
178 }
179
180 multi_filter
181}
182
183fn generate_splits_fast(
184 sstable_infos: &[&SstableInfo],
185 compaction_size: u64,
186 context: &CompactorContext,
187 max_sub_compaction: u32,
188) -> Vec<KeyRange> {
189 let worker_num = context.compaction_executor.worker_num();
190 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
191
192 let parallelism = calculate_task_parallelism_impl(
193 worker_num,
194 parallel_compact_size,
195 compaction_size,
196 max_sub_compaction,
197 );
198 let mut indexes = vec![];
199 for sst in sstable_infos {
200 let key_range = &sst.key_range;
201 indexes.push(
202 FullKey {
203 user_key: FullKey::decode(&key_range.left).user_key,
204 epoch_with_gap: EpochWithGap::new_max_epoch(),
205 }
206 .encode(),
207 );
208 indexes.push(
209 FullKey {
210 user_key: FullKey::decode(&key_range.right).user_key,
211 epoch_with_gap: EpochWithGap::new_max_epoch(),
212 }
213 .encode(),
214 );
215 }
216 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
217 indexes.dedup();
218 if indexes.len() <= parallelism {
219 return vec![];
220 }
221
222 let mut splits = vec![];
223 splits.push(KeyRange::default());
224 let parallel_key_count = indexes.len() / parallelism;
225 let mut last_split_key_count = 0;
226 for key in indexes {
227 if last_split_key_count >= parallel_key_count {
228 splits.last_mut().unwrap().right = Bytes::from(key.clone());
229 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
230 last_split_key_count = 0;
231 }
232 last_split_key_count += 1;
233 }
234
235 splits
236}
237
238pub async fn generate_splits(
239 sstable_infos: &[&SstableInfo],
240 compaction_size: u64,
241 context: &CompactorContext,
242 max_sub_compaction: u32,
243) -> HummockResult<Vec<KeyRange>> {
244 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
245 if compaction_size > parallel_compact_size {
246 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
247 return Ok(generate_splits_fast(
248 sstable_infos,
249 compaction_size,
250 context,
251 max_sub_compaction,
252 ));
253 }
254 let mut indexes = vec![];
255 for sstable_info in sstable_infos {
257 let sstable = context
258 .sstable_store
259 .sstable(sstable_info, &mut StoreLocalStatistic::default())
260 .await?;
261 indexes.extend(sstable.meta.block_metas.iter().map(|block| {
262 let data_size = block.len;
263 let full_key = FullKey {
264 user_key: FullKey::decode(&block.smallest_key).user_key,
265 epoch_with_gap: EpochWithGap::new_max_epoch(),
266 }
267 .encode();
268 (data_size as u64, full_key)
269 }));
270 }
271 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
273 let mut splits = vec![];
274 splits.push(KeyRange::default());
275
276 let parallelism = calculate_task_parallelism_impl(
277 context.compaction_executor.worker_num(),
278 parallel_compact_size,
279 compaction_size,
280 max_sub_compaction,
281 );
282
283 let sub_compaction_data_size =
284 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
285
286 if parallelism > 1 {
287 let mut last_buffer_size = 0;
288 let mut last_key: Vec<u8> = vec![];
289 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
290 for (data_size, key) in indexes {
291 if last_buffer_size >= sub_compaction_data_size
292 && !last_key.eq(&key)
293 && remaining_size > parallel_compact_size
294 {
295 splits.last_mut().unwrap().right = Bytes::from(key.clone());
296 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
297 last_buffer_size = data_size;
298 } else {
299 last_buffer_size += data_size;
300 }
301 remaining_size -= data_size;
302 last_key = key;
303 }
304 return Ok(splits);
305 }
306 }
307
308 Ok(vec![])
309}
310
311pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
312 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
313 let total_input_uncompressed_file_size = task
314 .read_input_ssts()
315 .map(|table| table.uncompressed_file_size)
316 .sum::<u64>();
317
318 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
319 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
320}
321
322pub fn estimate_output_key_count_by_size(
323 total_key_count: u64,
324 total_size: u64,
325 output_capacity: usize,
326) -> usize {
327 if total_key_count == 0 {
328 return 0;
329 }
330 if total_size == 0 {
331 return total_key_count.try_into().unwrap_or(usize::MAX);
332 }
333 if output_capacity == 0 {
334 return 0;
335 }
336
337 let estimated = (total_key_count as u128 * output_capacity as u128)
341 .div_ceil(total_size as u128)
342 .min(total_key_count as u128);
343 estimated.try_into().unwrap_or(usize::MAX)
344}
345
346pub fn estimate_output_key_count_for_input_ssts<'a>(
347 input_ssts: impl Iterator<Item = &'a SstableInfo>,
348 output_capacity: usize,
349) -> usize {
350 let (total_key_count, total_uncompressed_size) =
351 input_ssts.fold((0u64, 0u64), |(key_count, size), sst| {
352 (
353 key_count + sst.total_key_count,
354 size + sst.uncompressed_file_size,
355 )
356 });
357
358 estimate_output_key_count_by_size(total_key_count, total_uncompressed_size, output_capacity)
359}
360
361pub fn estimate_output_key_count_for_task(task: &CompactTask, output_capacity: usize) -> usize {
362 estimate_output_key_count_for_input_ssts(task.read_input_ssts(), output_capacity)
363}
364
365pub fn blocked_xor_filter_key_count_threshold(
366 blocked_xor_filter_kv_count_threshold: Option<u64>,
367) -> usize {
368 blocked_xor_filter_kv_count_threshold
369 .unwrap_or(compaction_config::DEFAULT_BLOCKED_XOR_FILTER_KV_COUNT_THRESHOLD)
370 .try_into()
371 .unwrap_or(usize::MAX)
372}
373
374pub async fn check_compaction_result(
376 compact_task: &CompactTask,
377 context: CompactorContext,
378 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
379) -> HummockResult<bool> {
380 if compact_task.contains_ttl() {
382 return Ok(true);
383 }
384
385 let mut table_iters = Vec::new();
386
387 for level in &compact_task.input_ssts {
388 if level.level_type == PbLevelType::Nonoverlapping {
389 let tables = level.read_sstable_infos().cloned().collect_vec();
390 if tables.is_empty() {
391 continue;
392 }
393 debug_assert!(can_concat(&tables));
394
395 table_iters.push(ConcatSstableIterator::new(
396 tables,
397 KeyRange::inf(),
398 context.sstable_store.clone(),
399 Arc::new(TaskProgress::default()),
400 context.storage_opts.compactor_iter_max_io_retry_times,
401 ));
402 } else {
403 for table_info in level.read_sstable_infos().cloned() {
404 table_iters.push(ConcatSstableIterator::new(
405 vec![table_info],
406 KeyRange::inf(),
407 context.sstable_store.clone(),
408 Arc::new(TaskProgress::default()),
409 context.storage_opts.compactor_iter_max_io_retry_times,
410 ));
411 }
412 }
413 }
414
415 let iter = MergeIterator::for_compactor(table_iters);
416 let left_iter = {
417 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
418 iter,
419 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
420 compact_task.pk_prefix_table_watermarks.clone(),
421 ),
422 );
423
424 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
425 skip_watermark_iter,
426 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
427 compact_task.non_pk_prefix_table_watermarks.clone(),
428 compaction_catalog_agent_ref.clone(),
429 ),
430 );
431
432 UserIterator::new(
433 combine_iter,
434 (Bound::Unbounded, Bound::Unbounded),
435 u64::MAX,
436 0,
437 None,
438 )
439 };
440 let iter = ConcatSstableIterator::new(
441 compact_task.sorted_output_ssts.clone(),
442 KeyRange::inf(),
443 context.sstable_store.clone(),
444 Arc::new(TaskProgress::default()),
445 context.storage_opts.compactor_iter_max_io_retry_times,
446 );
447 let right_iter = {
448 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
449 iter,
450 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
451 compact_task.pk_prefix_table_watermarks.clone(),
452 ),
453 );
454
455 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
456 skip_watermark_iter,
457 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
458 compact_task.non_pk_prefix_table_watermarks.clone(),
459 compaction_catalog_agent_ref,
460 ),
461 );
462
463 UserIterator::new(
464 combine_iter,
465 (Bound::Unbounded, Bound::Unbounded),
466 u64::MAX,
467 0,
468 None,
469 )
470 };
471
472 check_result(left_iter, right_iter).await
473}
474
475pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
476 left_iter: UserIterator<I>,
477 sort_ssts: Vec<SstableInfo>,
478 context: CompactorContext,
479) -> HummockResult<bool> {
480 let iter = ConcatSstableIterator::new(
481 sort_ssts,
482 KeyRange::inf(),
483 context.sstable_store.clone(),
484 Arc::new(TaskProgress::default()),
485 0,
486 );
487 let right_iter = UserIterator::new(
488 iter,
489 (Bound::Unbounded, Bound::Unbounded),
490 u64::MAX,
491 0,
492 None,
493 );
494 check_result(left_iter, right_iter).await
495}
496
497async fn check_result<
498 I1: HummockIterator<Direction = Forward>,
499 I2: HummockIterator<Direction = Forward>,
500>(
501 mut left_iter: UserIterator<I1>,
502 mut right_iter: UserIterator<I2>,
503) -> HummockResult<bool> {
504 left_iter.rewind().await?;
505 right_iter.rewind().await?;
506 let mut right_count = 0;
507 let mut left_count = 0;
508 while left_iter.is_valid() && right_iter.is_valid() {
509 if left_iter.key() != right_iter.key() {
510 tracing::error!(
511 "The key of input and output not equal. key: {:?} vs {:?}",
512 left_iter.key(),
513 right_iter.key()
514 );
515 return Ok(false);
516 }
517 if left_iter.value() != right_iter.value() {
518 tracing::error!(
519 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
520 left_iter.key(),
521 left_iter.value(),
522 right_iter.value()
523 );
524 return Ok(false);
525 }
526 left_iter.next().await?;
527 right_iter.next().await?;
528 left_count += 1;
529 right_count += 1;
530 }
531 while left_iter.is_valid() {
532 left_count += 1;
533 left_iter.next().await?;
534 }
535 while right_iter.is_valid() {
536 right_count += 1;
537 right_iter.next().await?;
538 }
539 if left_count != right_count {
540 tracing::error!(
541 "The key count of input and output not equal: {} vs {}",
542 left_count,
543 right_count
544 );
545 return Ok(false);
546 }
547 Ok(true)
548}
549
550pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
551 let input_ssts = compact_task.read_input_ssts().collect_vec();
552 let compaction_size = input_ssts_size(&input_ssts);
553 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
554}
555
556fn optimize_by_copy_block_with_input(
557 compact_task: &CompactTask,
558 context: &CompactorContext,
559 input_ssts: &[&SstableInfo],
560 compaction_size: u64,
561) -> bool {
562 let all_ssts_are_blocked_filter = input_ssts
563 .iter()
564 .all(|table_info| table_info.filter_layout == PbSstableFilterLayout::Blocked);
565 let current_filter_type = compact_task.sstable_filter_type;
566 let all_ssts_match_filter_type = input_ssts
567 .iter()
568 .all(|table_info| table_info.filter_type == current_filter_type);
569 let output_capacity = estimate_task_output_capacity(context.clone(), compact_task);
574 let estimated_output_key_count =
575 estimate_output_key_count_for_input_ssts(input_ssts.iter().copied(), output_capacity);
576 let output_filter_layout =
577 compact_task.sstable_filter_layout_for_output(estimated_output_key_count as u64);
578
579 let delete_key_count = input_ssts
580 .iter()
581 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
582 .sum::<u64>();
583 let total_key_count = input_ssts
584 .iter()
585 .map(|table_info| table_info.total_key_count)
586 .sum::<u64>();
587
588 let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
589 context.storage_opts.enable_fast_compaction
590 && matches!(
591 current_filter_type,
592 PbSstableFilterType::SstableFilterXor8 | PbSstableFilterType::SstableFilterXor16
593 )
594 && all_ssts_are_blocked_filter
595 && all_ssts_match_filter_type
596 && output_filter_layout == PbSstableFilterLayout::Blocked
597 && !compact_task.contains_range_tombstone()
598 && !compact_task.contains_ttl()
599 && !compact_task.contains_split_sst()
600 && single_table
601 && compact_task.target_level > 0
602 && compact_task.input_ssts.len() == 2
603 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
604 && delete_key_count * 100
605 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
606 && compact_task.task_type == PbTaskType::Dynamic
607}
608
609pub async fn generate_splits_for_task(
610 compact_task: &mut CompactTask,
611 context: &CompactorContext,
612 optimize_by_copy_block: bool,
613) -> HummockResult<()> {
614 let input_ssts = compact_task.read_input_ssts().collect_vec();
615 let compaction_size = input_ssts_size(&input_ssts);
616
617 if !optimize_by_copy_block {
618 let splits = generate_splits(
619 &input_ssts,
620 compaction_size,
621 context,
622 compact_task.max_sub_compaction,
623 )
624 .await?;
625 if !splits.is_empty() {
626 compact_task.splits = splits;
627 }
628 return Ok(());
629 }
630
631 Ok(())
632}
633
634pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
635 let group_label = compact_task.compaction_group_id.to_string();
636 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
637
638 let (select_size, select_count) = read_sstable_size_and_count(
639 compact_task
640 .input_ssts
641 .iter()
642 .filter(|level| level.level_idx != compact_task.target_level)
643 .flat_map(|level| level.read_sstable_infos()),
644 );
645 let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
646 compact_task
647 .input_ssts
648 .iter()
649 .filter(|level| level.level_idx == compact_task.target_level)
650 .flat_map(|level| level.read_sstable_infos()),
651 );
652
653 context
654 .compactor_metrics
655 .compact_read_current_level
656 .with_label_values(&[&group_label, &cur_level_label])
657 .inc_by(select_size);
658 context
659 .compactor_metrics
660 .compact_read_sstn_current_level
661 .with_label_values(&[&group_label, &cur_level_label])
662 .inc_by(select_count as u64);
663
664 let next_level_label = compact_task.target_level.to_string();
665 context
666 .compactor_metrics
667 .compact_read_next_level
668 .with_label_values(&[&group_label, &next_level_label])
669 .inc_by(target_level_read_bytes);
670 context
671 .compactor_metrics
672 .compact_read_sstn_next_level
673 .with_label_values(&[&group_label, &next_level_label])
674 .inc_by(target_count as u64);
675}
676
677fn read_sstable_size_and_count<'a>(
678 sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
679) -> (u64, usize) {
680 sstable_infos
681 .into_iter()
682 .fold((0, 0), |(size, count), table_info| {
683 (size + table_info.sst_size, count + 1)
684 })
685}
686
687pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
688 let input_ssts = compact_task.read_input_ssts().collect_vec();
689 let compaction_size = input_ssts_size(&input_ssts);
690 let optimize_by_copy_block =
691 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
692
693 if optimize_by_copy_block {
694 return 1;
695 }
696
697 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
698 calculate_task_parallelism_impl(
699 context.compaction_executor.worker_num(),
700 parallel_compact_size,
701 compaction_size,
702 compact_task.max_sub_compaction,
703 )
704}
705
706fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
707 input_ssts
708 .iter()
709 .map(|table_info| table_info.sst_size)
710 .sum()
711}
712
713pub fn calculate_task_parallelism_impl(
714 worker_num: usize,
715 parallel_compact_size: u64,
716 compaction_size: u64,
717 max_sub_compaction: u32,
718) -> usize {
719 let parallelism = compaction_size.div_ceil(parallel_compact_size);
720 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
721}
722
723#[cfg(test)]
724mod tests {
725 use std::sync::Arc;
726
727 use risingwave_common::catalog::TableId;
728 use risingwave_hummock_sdk::level::InputLevel;
729 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
730 use risingwave_pb::hummock::compact_task::PbTaskType;
731 use risingwave_pb::hummock::{PbLevelType, PbSstableFilterLayout, PbSstableFilterType};
732
733 use super::{
734 CompactTask, CompactorContext, estimate_output_key_count_by_size, optimize_by_copy_block,
735 };
736 use crate::hummock::compactor::new_compaction_await_tree_reg_ref;
737 use crate::hummock::iterator::test_utils::mock_sstable_store;
738 use crate::monitor::CompactorMetrics;
739 use crate::opts::StorageOpts;
740
741 fn test_sstable(
742 table_id: TableId,
743 total_key_count: u64,
744 filter_type: PbSstableFilterType,
745 ) -> risingwave_hummock_sdk::sstable_info::SstableInfo {
746 SstableInfoInner {
747 object_id: 1.into(),
748 sst_id: 1.into(),
749 table_ids: vec![table_id],
750 total_key_count,
751 sst_size: 1024,
752 uncompressed_file_size: 1024,
753 filter_type,
754 filter_layout: PbSstableFilterLayout::Blocked,
755 ..Default::default()
756 }
757 .into()
758 }
759
760 async fn test_context() -> CompactorContext {
761 CompactorContext::new_local_compact_context(
762 Arc::new(StorageOpts::default()),
763 mock_sstable_store().await,
764 Arc::new(CompactorMetrics::unused()),
765 Some(new_compaction_await_tree_reg_ref(
766 await_tree::Config::default(),
767 )),
768 )
769 }
770
771 fn test_compact_task(
772 layout: PbSstableFilterLayout,
773 blocked_xor_filter_kv_count_threshold: Option<u64>,
774 filter_type: PbSstableFilterType,
775 ) -> CompactTask {
776 let table_id = TableId::new(1);
777 CompactTask {
778 input_ssts: vec![
779 InputLevel {
780 level_idx: 1,
781 level_type: PbLevelType::Nonoverlapping,
782 table_infos: vec![test_sstable(table_id, 10, filter_type)],
783 },
784 InputLevel {
785 level_idx: 2,
786 level_type: PbLevelType::Nonoverlapping,
787 table_infos: vec![test_sstable(table_id, 10, filter_type)],
788 },
789 ],
790 existing_table_ids: vec![table_id],
791 target_level: 2,
792 target_file_size: 1024,
793 task_type: PbTaskType::Dynamic,
794 sstable_filter_type: filter_type,
795 sstable_filter_layout: layout,
796 blocked_xor_filter_kv_count_threshold,
797 ..Default::default()
798 }
799 }
800
801 #[test]
802 fn test_estimate_output_key_count_by_size_scales_to_output_sst() {
803 let estimated_key_count =
804 estimate_output_key_count_by_size(1024 * 1024, 512 * 1024 * 1024, 128 * 1024 * 1024);
805
806 assert_eq!(estimated_key_count, 256 * 1024);
807 assert_eq!(estimate_output_key_count_by_size(100, 0, 0), 100);
808 }
809
810 #[tokio::test]
811 async fn test_optimize_by_copy_block_respects_layout_policy() {
812 let context = test_context().await;
813 let compact_task = test_compact_task(
814 PbSstableFilterLayout::Plain,
815 Some(1),
816 PbSstableFilterType::SstableFilterXor16,
817 );
818
819 assert!(!optimize_by_copy_block(&compact_task, &context));
820
821 let compact_task = test_compact_task(
822 PbSstableFilterLayout::Auto,
823 Some(1024),
824 PbSstableFilterType::SstableFilterXor16,
825 );
826
827 assert!(!optimize_by_copy_block(&compact_task, &context));
828 }
829
830 #[tokio::test]
831 async fn test_optimize_by_copy_block_supports_blocked_xor_filters() {
832 let context = test_context().await;
833 let compact_task = test_compact_task(
834 PbSstableFilterLayout::Blocked,
835 Some(1024),
836 PbSstableFilterType::SstableFilterXor16,
837 );
838
839 assert!(optimize_by_copy_block(&compact_task, &context));
840
841 let compact_task = test_compact_task(
842 PbSstableFilterLayout::Blocked,
843 Some(1024),
844 PbSstableFilterType::SstableFilterXor8,
845 );
846
847 assert!(optimize_by_copy_block(&compact_task, &context));
848 }
849}