1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39 ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
40 TtlCompactionFilter,
41};
42use crate::hummock::iterator::{
43 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
44 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
45 UserIterator,
46};
47use crate::hummock::multi_builder::TableBuilderFactory;
48use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
49use crate::hummock::{
50 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56 pub object_id_getter: Arc<dyn GetObjectId>,
57 pub limiter: Arc<MemoryLimiter>,
58 pub options: SstableBuilderOptions,
59 pub policy: CachePolicy,
60 pub remote_rpc_cost: Arc<AtomicU64>,
61 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62 pub sstable_writer_factory: W,
63 pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68 type Filter = F;
69 type Writer = W::Writer;
70
71 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72 let timer = Instant::now();
73 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76 let writer_options = SstableWriterOptions {
77 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78 tracker: None,
79 policy: self.policy,
80 };
81 let writer = self
82 .sstable_writer_factory
83 .create_sst_writer(table_id, writer_options)
84 .await?;
85 let builder = SstableBuilder::new(
86 table_id,
87 writer,
88 Self::Filter::create(
89 self.options.bloom_false_positive,
90 self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
91 ),
92 self.options.clone(),
93 self.compaction_catalog_agent_ref.clone(),
94 Some(self.limiter.clone()),
95 );
96 Ok(builder)
97 }
98}
99
100#[derive(Default, Debug)]
102pub struct CompactionStatistics {
103 pub delta_drop_stat: TableStatsMap,
105
106 pub iter_total_key_counts: u64,
108 pub iter_drop_key_counts: u64,
109}
110
111impl CompactionStatistics {
112 #[allow(dead_code)]
113 fn delete_ratio(&self) -> Option<u64> {
114 if self.iter_total_key_counts == 0 {
115 return None;
116 }
117
118 Some(self.iter_drop_key_counts / self.iter_total_key_counts)
119 }
120}
121
122#[derive(Clone, Default)]
123pub struct TaskConfig {
124 pub key_range: KeyRange,
125 pub cache_policy: CachePolicy,
126 pub gc_delete_keys: bool,
127 pub retain_multiple_version: bool,
128 pub stats_target_table_ids: Option<HashSet<TableId>>,
132 pub task_type: PbTaskType,
133 pub use_block_based_filter: bool,
134
135 pub table_vnode_partition: BTreeMap<TableId, u32>,
136 pub table_schemas: HashMap<TableId, PbTableSchema>,
140 pub disable_drop_column_optimization: bool,
142}
143
144pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
145 let mut multi_filter = MultiCompactionFilter::default();
146 let compaction_filter_flag =
147 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
148 if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
149 let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
150 HashSet::from_iter(compact_task.existing_table_ids.clone()),
151 ));
152
153 multi_filter.register(state_clean_up_filter);
154 }
155
156 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
157 let id_to_ttl = compact_task
158 .table_options
159 .iter()
160 .filter_map(|(id, option)| {
161 option
162 .retention_seconds
163 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
164 })
165 .collect();
166
167 let ttl_filter = Box::new(TtlCompactionFilter::new(
168 id_to_ttl,
169 compact_task.current_epoch_time,
170 ));
171 multi_filter.register(ttl_filter);
172 }
173
174 multi_filter
175}
176
177fn generate_splits_fast(
178 sstable_infos: &Vec<SstableInfo>,
179 compaction_size: u64,
180 context: &CompactorContext,
181 max_sub_compaction: u32,
182) -> Vec<KeyRange> {
183 let worker_num = context.compaction_executor.worker_num();
184 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
185
186 let parallelism = calculate_task_parallelism_impl(
187 worker_num,
188 parallel_compact_size,
189 compaction_size,
190 max_sub_compaction,
191 );
192 let mut indexes = vec![];
193 for sst in sstable_infos {
194 let key_range = &sst.key_range;
195 indexes.push(
196 FullKey {
197 user_key: FullKey::decode(&key_range.left).user_key,
198 epoch_with_gap: EpochWithGap::new_max_epoch(),
199 }
200 .encode(),
201 );
202 indexes.push(
203 FullKey {
204 user_key: FullKey::decode(&key_range.right).user_key,
205 epoch_with_gap: EpochWithGap::new_max_epoch(),
206 }
207 .encode(),
208 );
209 }
210 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
211 indexes.dedup();
212 if indexes.len() <= parallelism {
213 return vec![];
214 }
215
216 let mut splits = vec![];
217 splits.push(KeyRange::default());
218 let parallel_key_count = indexes.len() / parallelism;
219 let mut last_split_key_count = 0;
220 for key in indexes {
221 if last_split_key_count >= parallel_key_count {
222 splits.last_mut().unwrap().right = Bytes::from(key.clone());
223 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
224 last_split_key_count = 0;
225 }
226 last_split_key_count += 1;
227 }
228
229 splits
230}
231
232pub async fn generate_splits(
233 sstable_infos: &Vec<SstableInfo>,
234 compaction_size: u64,
235 context: &CompactorContext,
236 max_sub_compaction: u32,
237) -> HummockResult<Vec<KeyRange>> {
238 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
239 if compaction_size > parallel_compact_size {
240 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
241 return Ok(generate_splits_fast(
242 sstable_infos,
243 compaction_size,
244 context,
245 max_sub_compaction,
246 ));
247 }
248 let mut indexes = vec![];
249 for sstable_info in sstable_infos {
251 indexes.extend(
252 context
253 .sstable_store
254 .sstable(sstable_info, &mut StoreLocalStatistic::default())
255 .await?
256 .meta
257 .block_metas
258 .iter()
259 .map(|block| {
260 let data_size = block.len;
261 let full_key = FullKey {
262 user_key: FullKey::decode(&block.smallest_key).user_key,
263 epoch_with_gap: EpochWithGap::new_max_epoch(),
264 }
265 .encode();
266 (data_size as u64, full_key)
267 })
268 .collect_vec(),
269 );
270 }
271 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
273 let mut splits = vec![];
274 splits.push(KeyRange::default());
275
276 let parallelism = calculate_task_parallelism_impl(
277 context.compaction_executor.worker_num(),
278 parallel_compact_size,
279 compaction_size,
280 max_sub_compaction,
281 );
282
283 let sub_compaction_data_size =
284 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
285
286 if parallelism > 1 {
287 let mut last_buffer_size = 0;
288 let mut last_key: Vec<u8> = vec![];
289 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
290 for (data_size, key) in indexes {
291 if last_buffer_size >= sub_compaction_data_size
292 && !last_key.eq(&key)
293 && remaining_size > parallel_compact_size
294 {
295 splits.last_mut().unwrap().right = Bytes::from(key.clone());
296 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
297 last_buffer_size = data_size;
298 } else {
299 last_buffer_size += data_size;
300 }
301 remaining_size -= data_size;
302 last_key = key;
303 }
304 return Ok(splits);
305 }
306 }
307
308 Ok(vec![])
309}
310
311pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
312 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
313 let total_input_uncompressed_file_size = task
314 .input_ssts
315 .iter()
316 .flat_map(|level| level.table_infos.iter())
317 .map(|table| table.uncompressed_file_size)
318 .sum::<u64>();
319
320 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
321 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
322}
323
324pub async fn check_compaction_result(
326 compact_task: &CompactTask,
327 context: CompactorContext,
328 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329) -> HummockResult<bool> {
330 let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
331 let need_clean_state_table = table_ids_from_input_ssts
332 .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
333 if compact_task.contains_ttl() || need_clean_state_table {
335 return Ok(true);
336 }
337
338 let mut table_iters = Vec::new();
339 for level in &compact_task.input_ssts {
340 if level.table_infos.is_empty() {
341 continue;
342 }
343
344 if level.level_type == PbLevelType::Nonoverlapping {
346 debug_assert!(can_concat(&level.table_infos));
347
348 table_iters.push(ConcatSstableIterator::new(
349 compact_task.existing_table_ids.clone(),
350 level.table_infos.clone(),
351 KeyRange::inf(),
352 context.sstable_store.clone(),
353 Arc::new(TaskProgress::default()),
354 context.storage_opts.compactor_iter_max_io_retry_times,
355 ));
356 } else {
357 for table_info in &level.table_infos {
358 table_iters.push(ConcatSstableIterator::new(
359 compact_task.existing_table_ids.clone(),
360 vec![table_info.clone()],
361 KeyRange::inf(),
362 context.sstable_store.clone(),
363 Arc::new(TaskProgress::default()),
364 context.storage_opts.compactor_iter_max_io_retry_times,
365 ));
366 }
367 }
368 }
369
370 let iter = MergeIterator::for_compactor(table_iters);
371 let left_iter = {
372 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
373 iter,
374 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
375 compact_task.pk_prefix_table_watermarks.clone(),
376 ),
377 );
378
379 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
380 skip_watermark_iter,
381 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
382 compact_task.non_pk_prefix_table_watermarks.clone(),
383 compaction_catalog_agent_ref.clone(),
384 ),
385 );
386
387 UserIterator::new(
388 combine_iter,
389 (Bound::Unbounded, Bound::Unbounded),
390 u64::MAX,
391 0,
392 None,
393 )
394 };
395 let iter = ConcatSstableIterator::new(
396 compact_task.existing_table_ids.clone(),
397 compact_task.sorted_output_ssts.clone(),
398 KeyRange::inf(),
399 context.sstable_store.clone(),
400 Arc::new(TaskProgress::default()),
401 context.storage_opts.compactor_iter_max_io_retry_times,
402 );
403 let right_iter = {
404 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
405 iter,
406 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
407 compact_task.pk_prefix_table_watermarks.clone(),
408 ),
409 );
410
411 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
412 skip_watermark_iter,
413 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
414 compact_task.non_pk_prefix_table_watermarks.clone(),
415 compaction_catalog_agent_ref,
416 ),
417 );
418
419 UserIterator::new(
420 combine_iter,
421 (Bound::Unbounded, Bound::Unbounded),
422 u64::MAX,
423 0,
424 None,
425 )
426 };
427
428 check_result(left_iter, right_iter).await
429}
430
431pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
432 left_iter: UserIterator<I>,
433 existing_table_ids: Vec<StateTableId>,
434 sort_ssts: Vec<SstableInfo>,
435 context: CompactorContext,
436) -> HummockResult<bool> {
437 let iter = ConcatSstableIterator::new(
438 existing_table_ids.clone(),
439 sort_ssts.clone(),
440 KeyRange::inf(),
441 context.sstable_store.clone(),
442 Arc::new(TaskProgress::default()),
443 0,
444 );
445 let right_iter = UserIterator::new(
446 iter,
447 (Bound::Unbounded, Bound::Unbounded),
448 u64::MAX,
449 0,
450 None,
451 );
452 check_result(left_iter, right_iter).await
453}
454
455async fn check_result<
456 I1: HummockIterator<Direction = Forward>,
457 I2: HummockIterator<Direction = Forward>,
458>(
459 mut left_iter: UserIterator<I1>,
460 mut right_iter: UserIterator<I2>,
461) -> HummockResult<bool> {
462 left_iter.rewind().await?;
463 right_iter.rewind().await?;
464 let mut right_count = 0;
465 let mut left_count = 0;
466 while left_iter.is_valid() && right_iter.is_valid() {
467 if left_iter.key() != right_iter.key() {
468 tracing::error!(
469 "The key of input and output not equal. key: {:?} vs {:?}",
470 left_iter.key(),
471 right_iter.key()
472 );
473 return Ok(false);
474 }
475 if left_iter.value() != right_iter.value() {
476 tracing::error!(
477 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
478 left_iter.key(),
479 left_iter.value(),
480 right_iter.value()
481 );
482 return Ok(false);
483 }
484 left_iter.next().await?;
485 right_iter.next().await?;
486 left_count += 1;
487 right_count += 1;
488 }
489 while left_iter.is_valid() {
490 left_count += 1;
491 left_iter.next().await?;
492 }
493 while right_iter.is_valid() {
494 right_count += 1;
495 right_iter.next().await?;
496 }
497 if left_count != right_count {
498 tracing::error!(
499 "The key count of input and output not equal: {} vs {}",
500 left_count,
501 right_count
502 );
503 return Ok(false);
504 }
505 Ok(true)
506}
507
508pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
509 let sstable_infos = compact_task
510 .input_ssts
511 .iter()
512 .flat_map(|level| level.table_infos.iter())
513 .filter(|table_info| {
514 let table_ids = &table_info.table_ids;
515 table_ids
516 .iter()
517 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
518 })
519 .cloned()
520 .collect_vec();
521 let compaction_size = sstable_infos
522 .iter()
523 .map(|table_info| table_info.sst_size)
524 .sum::<u64>();
525
526 let all_ssts_are_blocked_filter = sstable_infos
527 .iter()
528 .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
529
530 let delete_key_count = sstable_infos
531 .iter()
532 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
533 .sum::<u64>();
534 let total_key_count = sstable_infos
535 .iter()
536 .map(|table_info| table_info.total_key_count)
537 .sum::<u64>();
538
539 let single_table = compact_task.build_compact_table_ids().len() == 1;
540 context.storage_opts.enable_fast_compaction
541 && all_ssts_are_blocked_filter
542 && !compact_task.contains_range_tombstone()
543 && !compact_task.contains_ttl()
544 && !compact_task.contains_split_sst()
545 && single_table
546 && compact_task.target_level > 0
547 && compact_task.input_ssts.len() == 2
548 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
549 && delete_key_count * 100
550 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
551 && compact_task.task_type == PbTaskType::Dynamic
552}
553
554pub async fn generate_splits_for_task(
555 compact_task: &mut CompactTask,
556 context: &CompactorContext,
557 optimize_by_copy_block: bool,
558) -> HummockResult<()> {
559 let sstable_infos = compact_task
560 .input_ssts
561 .iter()
562 .flat_map(|level| level.table_infos.iter())
563 .filter(|table_info| {
564 let table_ids = &table_info.table_ids;
565 table_ids
566 .iter()
567 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
568 })
569 .cloned()
570 .collect_vec();
571 let compaction_size = sstable_infos
572 .iter()
573 .map(|table_info| table_info.sst_size)
574 .sum::<u64>();
575
576 if !optimize_by_copy_block {
577 let splits = generate_splits(
578 &sstable_infos,
579 compaction_size,
580 context,
581 compact_task.max_sub_compaction,
582 )
583 .await?;
584 if !splits.is_empty() {
585 compact_task.splits = splits;
586 }
587 return Ok(());
588 }
589
590 Ok(())
591}
592
593pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
594 let group_label = compact_task.compaction_group_id.to_string();
595 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
596 let select_table_infos = compact_task
597 .input_ssts
598 .iter()
599 .filter(|level| level.level_idx != compact_task.target_level)
600 .flat_map(|level| level.table_infos.iter())
601 .collect_vec();
602 let target_table_infos = compact_task
603 .input_ssts
604 .iter()
605 .filter(|level| level.level_idx == compact_task.target_level)
606 .flat_map(|level| level.table_infos.iter())
607 .collect_vec();
608 let select_size = select_table_infos
609 .iter()
610 .map(|table| table.sst_size)
611 .sum::<u64>();
612 context
613 .compactor_metrics
614 .compact_read_current_level
615 .with_label_values(&[&group_label, &cur_level_label])
616 .inc_by(select_size);
617 context
618 .compactor_metrics
619 .compact_read_sstn_current_level
620 .with_label_values(&[&group_label, &cur_level_label])
621 .inc_by(select_table_infos.len() as u64);
622
623 let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
624 let next_level_label = compact_task.target_level.to_string();
625 context
626 .compactor_metrics
627 .compact_read_next_level
628 .with_label_values(&[&group_label, &next_level_label])
629 .inc_by(target_level_read_bytes);
630 context
631 .compactor_metrics
632 .compact_read_sstn_next_level
633 .with_label_values(&[&group_label, &next_level_label])
634 .inc_by(target_table_infos.len() as u64);
635}
636
637pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
638 let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
639
640 if optimize_by_copy_block {
641 return 1;
642 }
643
644 let sstable_infos = compact_task
645 .input_ssts
646 .iter()
647 .flat_map(|level| level.table_infos.iter())
648 .filter(|table_info| {
649 let table_ids = &table_info.table_ids;
650 table_ids
651 .iter()
652 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
653 })
654 .cloned()
655 .collect_vec();
656 let compaction_size = sstable_infos
657 .iter()
658 .map(|table_info| table_info.sst_size)
659 .sum::<u64>();
660 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
661 calculate_task_parallelism_impl(
662 context.compaction_executor.worker_num(),
663 parallel_compact_size,
664 compaction_size,
665 compact_task.max_sub_compaction,
666 )
667}
668
669pub fn calculate_task_parallelism_impl(
670 worker_num: usize,
671 parallel_compact_size: u64,
672 compaction_size: u64,
673 max_sub_compaction: u32,
674) -> usize {
675 let parallelism = compaction_size.div_ceil(parallel_compact_size);
676 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
677}