1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39 ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
40 TtlCompactionFilter,
41};
42use crate::hummock::iterator::{
43 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
44 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
45 UserIterator,
46};
47use crate::hummock::multi_builder::TableBuilderFactory;
48use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
49use crate::hummock::{
50 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56 pub object_id_getter: Arc<dyn GetObjectId>,
57 pub limiter: Arc<MemoryLimiter>,
58 pub options: SstableBuilderOptions,
59 pub policy: CachePolicy,
60 pub remote_rpc_cost: Arc<AtomicU64>,
61 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62 pub sstable_writer_factory: W,
63 pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68 type Filter = F;
69 type Writer = W::Writer;
70
71 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72 let timer = Instant::now();
73 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76 let writer_options = SstableWriterOptions {
77 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78 tracker: None,
79 policy: self.policy,
80 };
81 let writer = self
82 .sstable_writer_factory
83 .create_sst_writer(table_id, writer_options)
84 .await?;
85 let builder = SstableBuilder::new(
86 table_id,
87 writer,
88 Self::Filter::create(
89 self.options.bloom_false_positive,
90 self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
91 ),
92 self.options.clone(),
93 self.compaction_catalog_agent_ref.clone(),
94 Some(self.limiter.clone()),
95 );
96 Ok(builder)
97 }
98}
99
100#[derive(Default, Debug)]
102pub struct CompactionStatistics {
103 pub delta_drop_stat: TableStatsMap,
105
106 pub iter_total_key_counts: u64,
108 pub iter_drop_key_counts: u64,
109}
110
111impl CompactionStatistics {
112 #[allow(dead_code)]
113 fn delete_ratio(&self) -> Option<u64> {
114 if self.iter_total_key_counts == 0 {
115 return None;
116 }
117
118 Some(self.iter_drop_key_counts / self.iter_total_key_counts)
119 }
120}
121
122#[derive(Clone, Default)]
123pub struct TaskConfig {
124 pub(crate) key_range: KeyRange,
125 pub(crate) cache_policy: CachePolicy,
126 pub(crate) gc_delete_keys: bool,
127 pub(crate) retain_multiple_version: bool,
128 pub(crate) stats_target_table_ids: Option<HashSet<TableId>>,
132 pub(crate) use_block_based_filter: bool,
133
134 pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
135 pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
139 pub(crate) disable_drop_column_optimization: bool,
141}
142
143impl TaskConfig {
144 #[cfg(any(test, feature = "test"))]
145 pub fn for_test(
146 key_range: KeyRange,
147 cache_policy: CachePolicy,
148 gc_delete_keys: bool,
149 use_block_based_filter: bool,
150 table_schemas: HashMap<TableId, PbTableSchema>,
151 ) -> Self {
152 Self {
153 key_range,
154 cache_policy,
155 gc_delete_keys,
156 retain_multiple_version: false,
157 stats_target_table_ids: None,
158 use_block_based_filter,
159 table_vnode_partition: BTreeMap::default(),
160 table_schemas,
161 disable_drop_column_optimization: false,
162 }
163 }
164
165 #[cfg(any(test, feature = "test"))]
166 pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
167 self.disable_drop_column_optimization = disable;
168 self
169 }
170}
171
172pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
173 let mut multi_filter = MultiCompactionFilter::default();
174 let compaction_filter_flag =
175 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
176 if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
177 let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
178 HashSet::from_iter(compact_task.existing_table_ids.clone()),
179 ));
180
181 multi_filter.register(state_clean_up_filter);
182 }
183
184 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
185 let id_to_ttl = compact_task
186 .table_options
187 .iter()
188 .filter_map(|(id, option)| {
189 option
190 .retention_seconds
191 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
192 })
193 .collect();
194
195 let ttl_filter = Box::new(TtlCompactionFilter::new(
196 id_to_ttl,
197 compact_task.current_epoch_time,
198 ));
199 multi_filter.register(ttl_filter);
200 }
201
202 multi_filter
203}
204
205fn generate_splits_fast(
206 sstable_infos: &Vec<SstableInfo>,
207 compaction_size: u64,
208 context: &CompactorContext,
209 max_sub_compaction: u32,
210) -> Vec<KeyRange> {
211 let worker_num = context.compaction_executor.worker_num();
212 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
213
214 let parallelism = calculate_task_parallelism_impl(
215 worker_num,
216 parallel_compact_size,
217 compaction_size,
218 max_sub_compaction,
219 );
220 let mut indexes = vec![];
221 for sst in sstable_infos {
222 let key_range = &sst.key_range;
223 indexes.push(
224 FullKey {
225 user_key: FullKey::decode(&key_range.left).user_key,
226 epoch_with_gap: EpochWithGap::new_max_epoch(),
227 }
228 .encode(),
229 );
230 indexes.push(
231 FullKey {
232 user_key: FullKey::decode(&key_range.right).user_key,
233 epoch_with_gap: EpochWithGap::new_max_epoch(),
234 }
235 .encode(),
236 );
237 }
238 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
239 indexes.dedup();
240 if indexes.len() <= parallelism {
241 return vec![];
242 }
243
244 let mut splits = vec![];
245 splits.push(KeyRange::default());
246 let parallel_key_count = indexes.len() / parallelism;
247 let mut last_split_key_count = 0;
248 for key in indexes {
249 if last_split_key_count >= parallel_key_count {
250 splits.last_mut().unwrap().right = Bytes::from(key.clone());
251 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
252 last_split_key_count = 0;
253 }
254 last_split_key_count += 1;
255 }
256
257 splits
258}
259
260pub async fn generate_splits(
261 sstable_infos: &Vec<SstableInfo>,
262 compaction_size: u64,
263 context: &CompactorContext,
264 max_sub_compaction: u32,
265) -> HummockResult<Vec<KeyRange>> {
266 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
267 if compaction_size > parallel_compact_size {
268 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
269 return Ok(generate_splits_fast(
270 sstable_infos,
271 compaction_size,
272 context,
273 max_sub_compaction,
274 ));
275 }
276 let mut indexes = vec![];
277 for sstable_info in sstable_infos {
279 indexes.extend(
280 context
281 .sstable_store
282 .sstable(sstable_info, &mut StoreLocalStatistic::default())
283 .await?
284 .meta
285 .block_metas
286 .iter()
287 .map(|block| {
288 let data_size = block.len;
289 let full_key = FullKey {
290 user_key: FullKey::decode(&block.smallest_key).user_key,
291 epoch_with_gap: EpochWithGap::new_max_epoch(),
292 }
293 .encode();
294 (data_size as u64, full_key)
295 })
296 .collect_vec(),
297 );
298 }
299 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
301 let mut splits = vec![];
302 splits.push(KeyRange::default());
303
304 let parallelism = calculate_task_parallelism_impl(
305 context.compaction_executor.worker_num(),
306 parallel_compact_size,
307 compaction_size,
308 max_sub_compaction,
309 );
310
311 let sub_compaction_data_size =
312 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
313
314 if parallelism > 1 {
315 let mut last_buffer_size = 0;
316 let mut last_key: Vec<u8> = vec![];
317 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
318 for (data_size, key) in indexes {
319 if last_buffer_size >= sub_compaction_data_size
320 && !last_key.eq(&key)
321 && remaining_size > parallel_compact_size
322 {
323 splits.last_mut().unwrap().right = Bytes::from(key.clone());
324 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
325 last_buffer_size = data_size;
326 } else {
327 last_buffer_size += data_size;
328 }
329 remaining_size -= data_size;
330 last_key = key;
331 }
332 return Ok(splits);
333 }
334 }
335
336 Ok(vec![])
337}
338
339pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
340 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
341 let total_input_uncompressed_file_size = task
342 .input_ssts
343 .iter()
344 .flat_map(|level| level.table_infos.iter())
345 .map(|table| table.uncompressed_file_size)
346 .sum::<u64>();
347
348 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
349 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
350}
351
352pub async fn check_compaction_result(
354 compact_task: &CompactTask,
355 context: CompactorContext,
356 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
357) -> HummockResult<bool> {
358 let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
359 let need_clean_state_table = table_ids_from_input_ssts
360 .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
361 if compact_task.contains_ttl() || need_clean_state_table {
363 return Ok(true);
364 }
365
366 let mut table_iters = Vec::new();
367 for level in &compact_task.input_ssts {
368 if level.table_infos.is_empty() {
369 continue;
370 }
371
372 if level.level_type == PbLevelType::Nonoverlapping {
374 debug_assert!(can_concat(&level.table_infos));
375
376 table_iters.push(ConcatSstableIterator::new(
377 compact_task.existing_table_ids.clone(),
378 level.table_infos.clone(),
379 KeyRange::inf(),
380 context.sstable_store.clone(),
381 Arc::new(TaskProgress::default()),
382 context.storage_opts.compactor_iter_max_io_retry_times,
383 ));
384 } else {
385 for table_info in &level.table_infos {
386 table_iters.push(ConcatSstableIterator::new(
387 compact_task.existing_table_ids.clone(),
388 vec![table_info.clone()],
389 KeyRange::inf(),
390 context.sstable_store.clone(),
391 Arc::new(TaskProgress::default()),
392 context.storage_opts.compactor_iter_max_io_retry_times,
393 ));
394 }
395 }
396 }
397
398 let iter = MergeIterator::for_compactor(table_iters);
399 let left_iter = {
400 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
401 iter,
402 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
403 compact_task.pk_prefix_table_watermarks.clone(),
404 ),
405 );
406
407 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
408 skip_watermark_iter,
409 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
410 compact_task.non_pk_prefix_table_watermarks.clone(),
411 compaction_catalog_agent_ref.clone(),
412 ),
413 );
414
415 UserIterator::new(
416 combine_iter,
417 (Bound::Unbounded, Bound::Unbounded),
418 u64::MAX,
419 0,
420 None,
421 )
422 };
423 let iter = ConcatSstableIterator::new(
424 compact_task.existing_table_ids.clone(),
425 compact_task.sorted_output_ssts.clone(),
426 KeyRange::inf(),
427 context.sstable_store.clone(),
428 Arc::new(TaskProgress::default()),
429 context.storage_opts.compactor_iter_max_io_retry_times,
430 );
431 let right_iter = {
432 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
433 iter,
434 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
435 compact_task.pk_prefix_table_watermarks.clone(),
436 ),
437 );
438
439 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
440 skip_watermark_iter,
441 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442 compact_task.non_pk_prefix_table_watermarks.clone(),
443 compaction_catalog_agent_ref,
444 ),
445 );
446
447 UserIterator::new(
448 combine_iter,
449 (Bound::Unbounded, Bound::Unbounded),
450 u64::MAX,
451 0,
452 None,
453 )
454 };
455
456 check_result(left_iter, right_iter).await
457}
458
459pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
460 left_iter: UserIterator<I>,
461 existing_table_ids: Vec<StateTableId>,
462 sort_ssts: Vec<SstableInfo>,
463 context: CompactorContext,
464) -> HummockResult<bool> {
465 let iter = ConcatSstableIterator::new(
466 existing_table_ids.clone(),
467 sort_ssts.clone(),
468 KeyRange::inf(),
469 context.sstable_store.clone(),
470 Arc::new(TaskProgress::default()),
471 0,
472 );
473 let right_iter = UserIterator::new(
474 iter,
475 (Bound::Unbounded, Bound::Unbounded),
476 u64::MAX,
477 0,
478 None,
479 );
480 check_result(left_iter, right_iter).await
481}
482
483async fn check_result<
484 I1: HummockIterator<Direction = Forward>,
485 I2: HummockIterator<Direction = Forward>,
486>(
487 mut left_iter: UserIterator<I1>,
488 mut right_iter: UserIterator<I2>,
489) -> HummockResult<bool> {
490 left_iter.rewind().await?;
491 right_iter.rewind().await?;
492 let mut right_count = 0;
493 let mut left_count = 0;
494 while left_iter.is_valid() && right_iter.is_valid() {
495 if left_iter.key() != right_iter.key() {
496 tracing::error!(
497 "The key of input and output not equal. key: {:?} vs {:?}",
498 left_iter.key(),
499 right_iter.key()
500 );
501 return Ok(false);
502 }
503 if left_iter.value() != right_iter.value() {
504 tracing::error!(
505 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
506 left_iter.key(),
507 left_iter.value(),
508 right_iter.value()
509 );
510 return Ok(false);
511 }
512 left_iter.next().await?;
513 right_iter.next().await?;
514 left_count += 1;
515 right_count += 1;
516 }
517 while left_iter.is_valid() {
518 left_count += 1;
519 left_iter.next().await?;
520 }
521 while right_iter.is_valid() {
522 right_count += 1;
523 right_iter.next().await?;
524 }
525 if left_count != right_count {
526 tracing::error!(
527 "The key count of input and output not equal: {} vs {}",
528 left_count,
529 right_count
530 );
531 return Ok(false);
532 }
533 Ok(true)
534}
535
536pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
537 let sstable_infos = compact_task
538 .input_ssts
539 .iter()
540 .flat_map(|level| level.table_infos.iter())
541 .filter(|table_info| {
542 let table_ids = &table_info.table_ids;
543 table_ids
544 .iter()
545 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
546 })
547 .cloned()
548 .collect_vec();
549 let compaction_size = sstable_infos
550 .iter()
551 .map(|table_info| table_info.sst_size)
552 .sum::<u64>();
553
554 let all_ssts_are_blocked_filter = sstable_infos
555 .iter()
556 .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
557
558 let delete_key_count = sstable_infos
559 .iter()
560 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
561 .sum::<u64>();
562 let total_key_count = sstable_infos
563 .iter()
564 .map(|table_info| table_info.total_key_count)
565 .sum::<u64>();
566
567 let single_table = compact_task.build_compact_table_ids().len() == 1;
568 context.storage_opts.enable_fast_compaction
569 && all_ssts_are_blocked_filter
570 && !compact_task.contains_range_tombstone()
571 && !compact_task.contains_ttl()
572 && !compact_task.contains_split_sst()
573 && single_table
574 && compact_task.target_level > 0
575 && compact_task.input_ssts.len() == 2
576 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
577 && delete_key_count * 100
578 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
579 && compact_task.task_type == PbTaskType::Dynamic
580}
581
582pub async fn generate_splits_for_task(
583 compact_task: &mut CompactTask,
584 context: &CompactorContext,
585 optimize_by_copy_block: bool,
586) -> HummockResult<()> {
587 let sstable_infos = compact_task
588 .input_ssts
589 .iter()
590 .flat_map(|level| level.table_infos.iter())
591 .filter(|table_info| {
592 let table_ids = &table_info.table_ids;
593 table_ids
594 .iter()
595 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
596 })
597 .cloned()
598 .collect_vec();
599 let compaction_size = sstable_infos
600 .iter()
601 .map(|table_info| table_info.sst_size)
602 .sum::<u64>();
603
604 if !optimize_by_copy_block {
605 let splits = generate_splits(
606 &sstable_infos,
607 compaction_size,
608 context,
609 compact_task.max_sub_compaction,
610 )
611 .await?;
612 if !splits.is_empty() {
613 compact_task.splits = splits;
614 }
615 return Ok(());
616 }
617
618 Ok(())
619}
620
621pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
622 let group_label = compact_task.compaction_group_id.to_string();
623 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
624 let select_table_infos = compact_task
625 .input_ssts
626 .iter()
627 .filter(|level| level.level_idx != compact_task.target_level)
628 .flat_map(|level| level.table_infos.iter())
629 .collect_vec();
630 let target_table_infos = compact_task
631 .input_ssts
632 .iter()
633 .filter(|level| level.level_idx == compact_task.target_level)
634 .flat_map(|level| level.table_infos.iter())
635 .collect_vec();
636 let select_size = select_table_infos
637 .iter()
638 .map(|table| table.sst_size)
639 .sum::<u64>();
640 context
641 .compactor_metrics
642 .compact_read_current_level
643 .with_label_values(&[&group_label, &cur_level_label])
644 .inc_by(select_size);
645 context
646 .compactor_metrics
647 .compact_read_sstn_current_level
648 .with_label_values(&[&group_label, &cur_level_label])
649 .inc_by(select_table_infos.len() as u64);
650
651 let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
652 let next_level_label = compact_task.target_level.to_string();
653 context
654 .compactor_metrics
655 .compact_read_next_level
656 .with_label_values(&[&group_label, &next_level_label])
657 .inc_by(target_level_read_bytes);
658 context
659 .compactor_metrics
660 .compact_read_sstn_next_level
661 .with_label_values(&[&group_label, &next_level_label])
662 .inc_by(target_table_infos.len() as u64);
663}
664
665pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
666 let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
667
668 if optimize_by_copy_block {
669 return 1;
670 }
671
672 let sstable_infos = compact_task
673 .input_ssts
674 .iter()
675 .flat_map(|level| level.table_infos.iter())
676 .filter(|table_info| {
677 let table_ids = &table_info.table_ids;
678 table_ids
679 .iter()
680 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
681 })
682 .cloned()
683 .collect_vec();
684 let compaction_size = sstable_infos
685 .iter()
686 .map(|table_info| table_info.sst_size)
687 .sum::<u64>();
688 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
689 calculate_task_parallelism_impl(
690 context.compaction_executor.worker_num(),
691 parallel_compact_size,
692 compaction_size,
693 compact_task.max_sub_compaction,
694 )
695}
696
697pub fn calculate_task_parallelism_impl(
698 worker_num: usize,
699 parallel_compact_size: u64,
700 compaction_size: u64,
701 max_sub_compaction: u32,
702) -> usize {
703 let parallelism = compaction_size.div_ceil(parallel_compact_size);
704 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
705}