1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::constants::hummock::CompactionFilterFlag;
24use risingwave_hummock_sdk::compact_task::CompactTask;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_stats::TableStatsMap;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
31use risingwave_pb::hummock::compact_task::PbTaskType;
32use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
33use tokio::time::Instant;
34
35pub use super::context::CompactorContext;
36use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
37use crate::hummock::compactor::{
38 ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
39 TtlCompactionFilter,
40};
41use crate::hummock::iterator::{
42 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
43 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
44 UserIterator,
45};
46use crate::hummock::multi_builder::TableBuilderFactory;
47use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
48use crate::hummock::{
49 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
50 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
51};
52use crate::monitor::StoreLocalStatistic;
53
54pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
55 pub object_id_getter: Box<dyn GetObjectId>,
56 pub limiter: Arc<MemoryLimiter>,
57 pub options: SstableBuilderOptions,
58 pub policy: CachePolicy,
59 pub remote_rpc_cost: Arc<AtomicU64>,
60 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
61 pub sstable_writer_factory: W,
62 pub _phantom: PhantomData<F>,
63}
64
65#[async_trait::async_trait]
66impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
67 type Filter = F;
68 type Writer = W::Writer;
69
70 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
71 let timer = Instant::now();
72 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
73 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
74 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
75 let writer_options = SstableWriterOptions {
76 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
77 tracker: None,
78 policy: self.policy,
79 };
80 let writer = self
81 .sstable_writer_factory
82 .create_sst_writer(table_id, writer_options)
83 .await?;
84 let builder = SstableBuilder::new(
85 table_id,
86 writer,
87 Self::Filter::create(
88 self.options.bloom_false_positive,
89 self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
90 ),
91 self.options.clone(),
92 self.compaction_catalog_agent_ref.clone(),
93 Some(self.limiter.clone()),
94 );
95 Ok(builder)
96 }
97}
98
99#[derive(Default, Debug)]
101pub struct CompactionStatistics {
102 pub delta_drop_stat: TableStatsMap,
104
105 pub iter_total_key_counts: u64,
107 pub iter_drop_key_counts: u64,
108}
109
110impl CompactionStatistics {
111 #[allow(dead_code)]
112 fn delete_ratio(&self) -> Option<u64> {
113 if self.iter_total_key_counts == 0 {
114 return None;
115 }
116
117 Some(self.iter_drop_key_counts / self.iter_total_key_counts)
118 }
119}
120
121#[derive(Clone, Default)]
122pub struct TaskConfig {
123 pub key_range: KeyRange,
124 pub cache_policy: CachePolicy,
125 pub gc_delete_keys: bool,
126 pub retain_multiple_version: bool,
127 pub stats_target_table_ids: Option<HashSet<u32>>,
131 pub task_type: PbTaskType,
132 pub use_block_based_filter: bool,
133
134 pub table_vnode_partition: BTreeMap<u32, u32>,
135 pub table_schemas: HashMap<u32, PbTableSchema>,
139 pub disable_drop_column_optimization: bool,
141}
142
143pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
144 let mut multi_filter = MultiCompactionFilter::default();
145 let compaction_filter_flag =
146 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
147 if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
148 let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
149 HashSet::from_iter(compact_task.existing_table_ids.clone()),
150 ));
151
152 multi_filter.register(state_clean_up_filter);
153 }
154
155 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
156 let id_to_ttl = compact_task
157 .table_options
158 .iter()
159 .filter_map(|(id, option)| {
160 option
161 .retention_seconds
162 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
163 })
164 .collect();
165
166 let ttl_filter = Box::new(TtlCompactionFilter::new(
167 id_to_ttl,
168 compact_task.current_epoch_time,
169 ));
170 multi_filter.register(ttl_filter);
171 }
172
173 multi_filter
174}
175
176fn generate_splits_fast(
177 sstable_infos: &Vec<SstableInfo>,
178 compaction_size: u64,
179 context: &CompactorContext,
180 max_sub_compaction: u32,
181) -> Vec<KeyRange> {
182 let worker_num = context.compaction_executor.worker_num();
183 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
184
185 let parallelism = calculate_task_parallelism_impl(
186 worker_num,
187 parallel_compact_size,
188 compaction_size,
189 max_sub_compaction,
190 );
191 let mut indexes = vec![];
192 for sst in sstable_infos {
193 let key_range = &sst.key_range;
194 indexes.push(
195 FullKey {
196 user_key: FullKey::decode(&key_range.left).user_key,
197 epoch_with_gap: EpochWithGap::new_max_epoch(),
198 }
199 .encode(),
200 );
201 indexes.push(
202 FullKey {
203 user_key: FullKey::decode(&key_range.right).user_key,
204 epoch_with_gap: EpochWithGap::new_max_epoch(),
205 }
206 .encode(),
207 );
208 }
209 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
210 indexes.dedup();
211 if indexes.len() <= parallelism {
212 return vec![];
213 }
214
215 let mut splits = vec![];
216 splits.push(KeyRange::default());
217 let parallel_key_count = indexes.len() / parallelism;
218 let mut last_split_key_count = 0;
219 for key in indexes {
220 if last_split_key_count >= parallel_key_count {
221 splits.last_mut().unwrap().right = Bytes::from(key.clone());
222 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
223 last_split_key_count = 0;
224 }
225 last_split_key_count += 1;
226 }
227
228 splits
229}
230
231pub async fn generate_splits(
232 sstable_infos: &Vec<SstableInfo>,
233 compaction_size: u64,
234 context: &CompactorContext,
235 max_sub_compaction: u32,
236) -> HummockResult<Vec<KeyRange>> {
237 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
238 if compaction_size > parallel_compact_size {
239 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
240 return Ok(generate_splits_fast(
241 sstable_infos,
242 compaction_size,
243 context,
244 max_sub_compaction,
245 ));
246 }
247 let mut indexes = vec![];
248 for sstable_info in sstable_infos {
250 indexes.extend(
251 context
252 .sstable_store
253 .sstable(sstable_info, &mut StoreLocalStatistic::default())
254 .await?
255 .meta
256 .block_metas
257 .iter()
258 .map(|block| {
259 let data_size = block.len;
260 let full_key = FullKey {
261 user_key: FullKey::decode(&block.smallest_key).user_key,
262 epoch_with_gap: EpochWithGap::new_max_epoch(),
263 }
264 .encode();
265 (data_size as u64, full_key)
266 })
267 .collect_vec(),
268 );
269 }
270 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
272 let mut splits = vec![];
273 splits.push(KeyRange::default());
274
275 let parallelism = calculate_task_parallelism_impl(
276 context.compaction_executor.worker_num(),
277 parallel_compact_size,
278 compaction_size,
279 max_sub_compaction,
280 );
281
282 let sub_compaction_data_size =
283 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
284
285 if parallelism > 1 {
286 let mut last_buffer_size = 0;
287 let mut last_key: Vec<u8> = vec![];
288 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
289 for (data_size, key) in indexes {
290 if last_buffer_size >= sub_compaction_data_size
291 && !last_key.eq(&key)
292 && remaining_size > parallel_compact_size
293 {
294 splits.last_mut().unwrap().right = Bytes::from(key.clone());
295 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
296 last_buffer_size = data_size;
297 } else {
298 last_buffer_size += data_size;
299 }
300 remaining_size -= data_size;
301 last_key = key;
302 }
303 return Ok(splits);
304 }
305 }
306
307 Ok(vec![])
308}
309
310pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
311 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
312 let total_input_uncompressed_file_size = task
313 .input_ssts
314 .iter()
315 .flat_map(|level| level.table_infos.iter())
316 .map(|table| table.uncompressed_file_size)
317 .sum::<u64>();
318
319 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
320 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
321}
322
323pub async fn check_compaction_result(
325 compact_task: &CompactTask,
326 context: CompactorContext,
327 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
328) -> HummockResult<bool> {
329 let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
330 let need_clean_state_table = table_ids_from_input_ssts
331 .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
332 if compact_task.contains_ttl() || need_clean_state_table {
334 return Ok(true);
335 }
336
337 let mut table_iters = Vec::new();
338 for level in &compact_task.input_ssts {
339 if level.table_infos.is_empty() {
340 continue;
341 }
342
343 if level.level_type == PbLevelType::Nonoverlapping {
345 debug_assert!(can_concat(&level.table_infos));
346
347 table_iters.push(ConcatSstableIterator::new(
348 compact_task.existing_table_ids.clone(),
349 level.table_infos.clone(),
350 KeyRange::inf(),
351 context.sstable_store.clone(),
352 Arc::new(TaskProgress::default()),
353 context.storage_opts.compactor_iter_max_io_retry_times,
354 ));
355 } else {
356 for table_info in &level.table_infos {
357 table_iters.push(ConcatSstableIterator::new(
358 compact_task.existing_table_ids.clone(),
359 vec![table_info.clone()],
360 KeyRange::inf(),
361 context.sstable_store.clone(),
362 Arc::new(TaskProgress::default()),
363 context.storage_opts.compactor_iter_max_io_retry_times,
364 ));
365 }
366 }
367 }
368
369 let iter = MergeIterator::for_compactor(table_iters);
370 let left_iter = {
371 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
372 iter,
373 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
374 compact_task.pk_prefix_table_watermarks.clone(),
375 ),
376 );
377
378 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
379 skip_watermark_iter,
380 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
381 compact_task.non_pk_prefix_table_watermarks.clone(),
382 compaction_catalog_agent_ref.clone(),
383 ),
384 );
385
386 UserIterator::new(
387 combine_iter,
388 (Bound::Unbounded, Bound::Unbounded),
389 u64::MAX,
390 0,
391 None,
392 )
393 };
394 let iter = ConcatSstableIterator::new(
395 compact_task.existing_table_ids.clone(),
396 compact_task.sorted_output_ssts.clone(),
397 KeyRange::inf(),
398 context.sstable_store.clone(),
399 Arc::new(TaskProgress::default()),
400 context.storage_opts.compactor_iter_max_io_retry_times,
401 );
402 let right_iter = {
403 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
404 iter,
405 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
406 compact_task.pk_prefix_table_watermarks.clone(),
407 ),
408 );
409
410 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
411 skip_watermark_iter,
412 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
413 compact_task.non_pk_prefix_table_watermarks.clone(),
414 compaction_catalog_agent_ref,
415 ),
416 );
417
418 UserIterator::new(
419 combine_iter,
420 (Bound::Unbounded, Bound::Unbounded),
421 u64::MAX,
422 0,
423 None,
424 )
425 };
426
427 check_result(left_iter, right_iter).await
428}
429
430pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
431 left_iter: UserIterator<I>,
432 existing_table_ids: Vec<StateTableId>,
433 sort_ssts: Vec<SstableInfo>,
434 context: CompactorContext,
435) -> HummockResult<bool> {
436 let iter = ConcatSstableIterator::new(
437 existing_table_ids.clone(),
438 sort_ssts.clone(),
439 KeyRange::inf(),
440 context.sstable_store.clone(),
441 Arc::new(TaskProgress::default()),
442 0,
443 );
444 let right_iter = UserIterator::new(
445 iter,
446 (Bound::Unbounded, Bound::Unbounded),
447 u64::MAX,
448 0,
449 None,
450 );
451 check_result(left_iter, right_iter).await
452}
453
454async fn check_result<
455 I1: HummockIterator<Direction = Forward>,
456 I2: HummockIterator<Direction = Forward>,
457>(
458 mut left_iter: UserIterator<I1>,
459 mut right_iter: UserIterator<I2>,
460) -> HummockResult<bool> {
461 left_iter.rewind().await?;
462 right_iter.rewind().await?;
463 let mut right_count = 0;
464 let mut left_count = 0;
465 while left_iter.is_valid() && right_iter.is_valid() {
466 if left_iter.key() != right_iter.key() {
467 tracing::error!(
468 "The key of input and output not equal. key: {:?} vs {:?}",
469 left_iter.key(),
470 right_iter.key()
471 );
472 return Ok(false);
473 }
474 if left_iter.value() != right_iter.value() {
475 tracing::error!(
476 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
477 left_iter.key(),
478 left_iter.value(),
479 right_iter.value()
480 );
481 return Ok(false);
482 }
483 left_iter.next().await?;
484 right_iter.next().await?;
485 left_count += 1;
486 right_count += 1;
487 }
488 while left_iter.is_valid() {
489 left_count += 1;
490 left_iter.next().await?;
491 }
492 while right_iter.is_valid() {
493 right_count += 1;
494 right_iter.next().await?;
495 }
496 if left_count != right_count {
497 tracing::error!(
498 "The key count of input and output not equal: {} vs {}",
499 left_count,
500 right_count
501 );
502 return Ok(false);
503 }
504 Ok(true)
505}
506
507pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
508 let sstable_infos = compact_task
509 .input_ssts
510 .iter()
511 .flat_map(|level| level.table_infos.iter())
512 .filter(|table_info| {
513 let table_ids = &table_info.table_ids;
514 table_ids
515 .iter()
516 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
517 })
518 .cloned()
519 .collect_vec();
520 let compaction_size = sstable_infos
521 .iter()
522 .map(|table_info| table_info.sst_size)
523 .sum::<u64>();
524
525 let all_ssts_are_blocked_filter = sstable_infos
526 .iter()
527 .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
528
529 let delete_key_count = sstable_infos
530 .iter()
531 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
532 .sum::<u64>();
533 let total_key_count = sstable_infos
534 .iter()
535 .map(|table_info| table_info.total_key_count)
536 .sum::<u64>();
537
538 let single_table = compact_task.build_compact_table_ids().len() == 1;
539 context.storage_opts.enable_fast_compaction
540 && all_ssts_are_blocked_filter
541 && !compact_task.contains_range_tombstone()
542 && !compact_task.contains_ttl()
543 && !compact_task.contains_split_sst()
544 && single_table
545 && compact_task.target_level > 0
546 && compact_task.input_ssts.len() == 2
547 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
548 && delete_key_count * 100
549 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
550 && compact_task.task_type == PbTaskType::Dynamic
551}
552
553pub async fn generate_splits_for_task(
554 compact_task: &mut CompactTask,
555 context: &CompactorContext,
556 optimize_by_copy_block: bool,
557) -> HummockResult<()> {
558 let sstable_infos = compact_task
559 .input_ssts
560 .iter()
561 .flat_map(|level| level.table_infos.iter())
562 .filter(|table_info| {
563 let table_ids = &table_info.table_ids;
564 table_ids
565 .iter()
566 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
567 })
568 .cloned()
569 .collect_vec();
570 let compaction_size = sstable_infos
571 .iter()
572 .map(|table_info| table_info.sst_size)
573 .sum::<u64>();
574
575 if !optimize_by_copy_block {
576 let splits = generate_splits(
577 &sstable_infos,
578 compaction_size,
579 context,
580 compact_task.max_sub_compaction,
581 )
582 .await?;
583 if !splits.is_empty() {
584 compact_task.splits = splits;
585 }
586 return Ok(());
587 }
588
589 Ok(())
590}
591
592pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
593 let group_label = compact_task.compaction_group_id.to_string();
594 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
595 let select_table_infos = compact_task
596 .input_ssts
597 .iter()
598 .filter(|level| level.level_idx != compact_task.target_level)
599 .flat_map(|level| level.table_infos.iter())
600 .collect_vec();
601 let target_table_infos = compact_task
602 .input_ssts
603 .iter()
604 .filter(|level| level.level_idx == compact_task.target_level)
605 .flat_map(|level| level.table_infos.iter())
606 .collect_vec();
607 let select_size = select_table_infos
608 .iter()
609 .map(|table| table.sst_size)
610 .sum::<u64>();
611 context
612 .compactor_metrics
613 .compact_read_current_level
614 .with_label_values(&[&group_label, &cur_level_label])
615 .inc_by(select_size);
616 context
617 .compactor_metrics
618 .compact_read_sstn_current_level
619 .with_label_values(&[&group_label, &cur_level_label])
620 .inc_by(select_table_infos.len() as u64);
621
622 let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
623 let next_level_label = compact_task.target_level.to_string();
624 context
625 .compactor_metrics
626 .compact_read_next_level
627 .with_label_values(&[&group_label, next_level_label.as_str()])
628 .inc_by(target_level_read_bytes);
629 context
630 .compactor_metrics
631 .compact_read_sstn_next_level
632 .with_label_values(&[&group_label, next_level_label.as_str()])
633 .inc_by(target_table_infos.len() as u64);
634}
635
636pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
637 let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
638
639 if optimize_by_copy_block {
640 return 1;
641 }
642
643 let sstable_infos = compact_task
644 .input_ssts
645 .iter()
646 .flat_map(|level| level.table_infos.iter())
647 .filter(|table_info| {
648 let table_ids = &table_info.table_ids;
649 table_ids
650 .iter()
651 .any(|table_id| compact_task.existing_table_ids.contains(table_id))
652 })
653 .cloned()
654 .collect_vec();
655 let compaction_size = sstable_infos
656 .iter()
657 .map(|table_info| table_info.sst_size)
658 .sum::<u64>();
659 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
660 calculate_task_parallelism_impl(
661 context.compaction_executor.worker_num(),
662 parallel_compact_size,
663 compaction_size,
664 compact_task.max_sub_compaction,
665 )
666}
667
668pub fn calculate_task_parallelism_impl(
669 worker_num: usize,
670 parallel_compact_size: u64,
671 compaction_size: u64,
672 max_sub_compaction: u32,
673) -> usize {
674 let parallelism = compaction_size.div_ceil(parallel_compact_size);
675 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
676}