1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_stats::TableStatsMap;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
31use risingwave_pb::hummock::compact_task::PbTaskType;
32use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
33use tokio::time::Instant;
34
35pub use super::context::CompactorContext;
36use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
37use crate::hummock::compactor::{
38 ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
39};
40use crate::hummock::iterator::{
41 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
42 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
43 UserIterator,
44};
45use crate::hummock::multi_builder::TableBuilderFactory;
46use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
47use crate::hummock::{
48 CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
49 SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
50};
51use crate::monitor::StoreLocalStatistic;
52
53pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
54 pub object_id_getter: Arc<dyn GetObjectId>,
55 pub limiter: Arc<MemoryLimiter>,
56 pub options: SstableBuilderOptions,
57 pub policy: CachePolicy,
58 pub remote_rpc_cost: Arc<AtomicU64>,
59 pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
60 pub sstable_writer_factory: W,
61 pub _phantom: PhantomData<F>,
62}
63
64#[async_trait::async_trait]
65impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
66 type Filter = F;
67 type Writer = W::Writer;
68
69 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
70 let timer = Instant::now();
71 let table_id = self.object_id_getter.get_new_sst_object_id().await?;
72 let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
73 self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
74 let writer_options = SstableWriterOptions {
75 capacity_hint: Some(self.options.capacity + self.options.block_capacity),
76 tracker: None,
77 policy: self.policy,
78 };
79 let writer = self
80 .sstable_writer_factory
81 .create_sst_writer(table_id, writer_options)
82 .await?;
83 let builder = SstableBuilder::new(
84 table_id,
85 writer,
86 Self::Filter::create(
87 self.options.bloom_false_positive,
88 self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
89 ),
90 self.options.clone(),
91 self.compaction_catalog_agent_ref.clone(),
92 Some(self.limiter.clone()),
93 );
94 Ok(builder)
95 }
96}
97
98#[derive(Default, Debug)]
100pub struct CompactionStatistics {
101 pub delta_drop_stat: TableStatsMap,
103
104 pub iter_total_key_counts: u64,
106 pub iter_drop_key_counts: u64,
107}
108
109impl CompactionStatistics {
110 #[expect(dead_code)]
111 fn delete_ratio(&self) -> Option<u64> {
112 if self.iter_total_key_counts == 0 {
113 return None;
114 }
115
116 Some(self.iter_drop_key_counts / self.iter_total_key_counts)
117 }
118}
119
120#[derive(Clone, Default)]
121pub struct TaskConfig {
122 pub(crate) key_range: KeyRange,
123 pub(crate) cache_policy: CachePolicy,
124 pub(crate) gc_delete_keys: bool,
125 pub(crate) retain_multiple_version: bool,
126 pub(crate) use_block_based_filter: bool,
127
128 pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
129 pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
133 pub(crate) disable_drop_column_optimization: bool,
135}
136
137impl TaskConfig {
138 #[cfg(any(test, feature = "test"))]
139 pub fn for_test(
140 key_range: KeyRange,
141 cache_policy: CachePolicy,
142 gc_delete_keys: bool,
143 use_block_based_filter: bool,
144 table_schemas: HashMap<TableId, PbTableSchema>,
145 ) -> Self {
146 Self {
147 key_range,
148 cache_policy,
149 gc_delete_keys,
150 retain_multiple_version: false,
151 use_block_based_filter,
152 table_vnode_partition: BTreeMap::default(),
153 table_schemas,
154 disable_drop_column_optimization: false,
155 }
156 }
157
158 #[cfg(any(test, feature = "test"))]
159 pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
160 self.disable_drop_column_optimization = disable;
161 self
162 }
163}
164
165pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
166 let mut multi_filter = MultiCompactionFilter::default();
167 let compaction_filter_flag =
168 CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
169 if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
173 let id_to_ttl = compact_task
174 .table_options
175 .iter()
176 .filter_map(|(id, option)| {
177 option
178 .retention_seconds
179 .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
180 })
181 .collect();
182
183 let ttl_filter = Box::new(TtlCompactionFilter::new(
184 id_to_ttl,
185 compact_task.current_epoch_time,
186 ));
187 multi_filter.register(ttl_filter);
188 }
189
190 multi_filter
191}
192
193fn generate_splits_fast(
194 sstable_infos: &[&SstableInfo],
195 compaction_size: u64,
196 context: &CompactorContext,
197 max_sub_compaction: u32,
198) -> Vec<KeyRange> {
199 let worker_num = context.compaction_executor.worker_num();
200 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
201
202 let parallelism = calculate_task_parallelism_impl(
203 worker_num,
204 parallel_compact_size,
205 compaction_size,
206 max_sub_compaction,
207 );
208 let mut indexes = vec![];
209 for sst in sstable_infos {
210 let key_range = &sst.key_range;
211 indexes.push(
212 FullKey {
213 user_key: FullKey::decode(&key_range.left).user_key,
214 epoch_with_gap: EpochWithGap::new_max_epoch(),
215 }
216 .encode(),
217 );
218 indexes.push(
219 FullKey {
220 user_key: FullKey::decode(&key_range.right).user_key,
221 epoch_with_gap: EpochWithGap::new_max_epoch(),
222 }
223 .encode(),
224 );
225 }
226 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
227 indexes.dedup();
228 if indexes.len() <= parallelism {
229 return vec![];
230 }
231
232 let mut splits = vec![];
233 splits.push(KeyRange::default());
234 let parallel_key_count = indexes.len() / parallelism;
235 let mut last_split_key_count = 0;
236 for key in indexes {
237 if last_split_key_count >= parallel_key_count {
238 splits.last_mut().unwrap().right = Bytes::from(key.clone());
239 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
240 last_split_key_count = 0;
241 }
242 last_split_key_count += 1;
243 }
244
245 splits
246}
247
248pub async fn generate_splits(
249 sstable_infos: &[&SstableInfo],
250 compaction_size: u64,
251 context: &CompactorContext,
252 max_sub_compaction: u32,
253) -> HummockResult<Vec<KeyRange>> {
254 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
255 if compaction_size > parallel_compact_size {
256 if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
257 return Ok(generate_splits_fast(
258 sstable_infos,
259 compaction_size,
260 context,
261 max_sub_compaction,
262 ));
263 }
264 let mut indexes = vec![];
265 for sstable_info in sstable_infos {
267 let sstable = context
268 .sstable_store
269 .sstable(sstable_info, &mut StoreLocalStatistic::default())
270 .await?;
271 indexes.extend(sstable.meta.block_metas.iter().map(|block| {
272 let data_size = block.len;
273 let full_key = FullKey {
274 user_key: FullKey::decode(&block.smallest_key).user_key,
275 epoch_with_gap: EpochWithGap::new_max_epoch(),
276 }
277 .encode();
278 (data_size as u64, full_key)
279 }));
280 }
281 indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
283 let mut splits = vec![];
284 splits.push(KeyRange::default());
285
286 let parallelism = calculate_task_parallelism_impl(
287 context.compaction_executor.worker_num(),
288 parallel_compact_size,
289 compaction_size,
290 max_sub_compaction,
291 );
292
293 let sub_compaction_data_size =
294 std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
295
296 if parallelism > 1 {
297 let mut last_buffer_size = 0;
298 let mut last_key: Vec<u8> = vec![];
299 let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
300 for (data_size, key) in indexes {
301 if last_buffer_size >= sub_compaction_data_size
302 && !last_key.eq(&key)
303 && remaining_size > parallel_compact_size
304 {
305 splits.last_mut().unwrap().right = Bytes::from(key.clone());
306 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
307 last_buffer_size = data_size;
308 } else {
309 last_buffer_size += data_size;
310 }
311 remaining_size -= data_size;
312 last_key = key;
313 }
314 return Ok(splits);
315 }
316 }
317
318 Ok(vec![])
319}
320
321pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
322 let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
323 let total_input_uncompressed_file_size = task
324 .read_input_ssts()
325 .map(|table| table.uncompressed_file_size)
326 .sum::<u64>();
327
328 let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
329 std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
330}
331
332pub async fn check_compaction_result(
334 compact_task: &CompactTask,
335 context: CompactorContext,
336 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
337) -> HummockResult<bool> {
338 if compact_task.contains_ttl() {
340 return Ok(true);
341 }
342
343 let mut table_iters = Vec::new();
344
345 for level in &compact_task.input_ssts {
346 if level.level_type == PbLevelType::Nonoverlapping {
347 let tables = level.read_sstable_infos().cloned().collect_vec();
348 if tables.is_empty() {
349 continue;
350 }
351 debug_assert!(can_concat(&tables));
352
353 table_iters.push(ConcatSstableIterator::new(
354 tables,
355 KeyRange::inf(),
356 context.sstable_store.clone(),
357 Arc::new(TaskProgress::default()),
358 context.storage_opts.compactor_iter_max_io_retry_times,
359 ));
360 } else {
361 for table_info in level.read_sstable_infos().cloned() {
362 table_iters.push(ConcatSstableIterator::new(
363 vec![table_info],
364 KeyRange::inf(),
365 context.sstable_store.clone(),
366 Arc::new(TaskProgress::default()),
367 context.storage_opts.compactor_iter_max_io_retry_times,
368 ));
369 }
370 }
371 }
372
373 let iter = MergeIterator::for_compactor(table_iters);
374 let left_iter = {
375 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
376 iter,
377 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
378 compact_task.pk_prefix_table_watermarks.clone(),
379 ),
380 );
381
382 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
383 skip_watermark_iter,
384 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
385 compact_task.non_pk_prefix_table_watermarks.clone(),
386 compaction_catalog_agent_ref.clone(),
387 ),
388 );
389
390 UserIterator::new(
391 combine_iter,
392 (Bound::Unbounded, Bound::Unbounded),
393 u64::MAX,
394 0,
395 None,
396 )
397 };
398 let iter = ConcatSstableIterator::new(
399 compact_task.sorted_output_ssts.clone(),
400 KeyRange::inf(),
401 context.sstable_store.clone(),
402 Arc::new(TaskProgress::default()),
403 context.storage_opts.compactor_iter_max_io_retry_times,
404 );
405 let right_iter = {
406 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
407 iter,
408 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
409 compact_task.pk_prefix_table_watermarks.clone(),
410 ),
411 );
412
413 let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
414 skip_watermark_iter,
415 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
416 compact_task.non_pk_prefix_table_watermarks.clone(),
417 compaction_catalog_agent_ref,
418 ),
419 );
420
421 UserIterator::new(
422 combine_iter,
423 (Bound::Unbounded, Bound::Unbounded),
424 u64::MAX,
425 0,
426 None,
427 )
428 };
429
430 check_result(left_iter, right_iter).await
431}
432
433pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
434 left_iter: UserIterator<I>,
435 sort_ssts: Vec<SstableInfo>,
436 context: CompactorContext,
437) -> HummockResult<bool> {
438 let iter = ConcatSstableIterator::new(
439 sort_ssts,
440 KeyRange::inf(),
441 context.sstable_store.clone(),
442 Arc::new(TaskProgress::default()),
443 0,
444 );
445 let right_iter = UserIterator::new(
446 iter,
447 (Bound::Unbounded, Bound::Unbounded),
448 u64::MAX,
449 0,
450 None,
451 );
452 check_result(left_iter, right_iter).await
453}
454
455async fn check_result<
456 I1: HummockIterator<Direction = Forward>,
457 I2: HummockIterator<Direction = Forward>,
458>(
459 mut left_iter: UserIterator<I1>,
460 mut right_iter: UserIterator<I2>,
461) -> HummockResult<bool> {
462 left_iter.rewind().await?;
463 right_iter.rewind().await?;
464 let mut right_count = 0;
465 let mut left_count = 0;
466 while left_iter.is_valid() && right_iter.is_valid() {
467 if left_iter.key() != right_iter.key() {
468 tracing::error!(
469 "The key of input and output not equal. key: {:?} vs {:?}",
470 left_iter.key(),
471 right_iter.key()
472 );
473 return Ok(false);
474 }
475 if left_iter.value() != right_iter.value() {
476 tracing::error!(
477 "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
478 left_iter.key(),
479 left_iter.value(),
480 right_iter.value()
481 );
482 return Ok(false);
483 }
484 left_iter.next().await?;
485 right_iter.next().await?;
486 left_count += 1;
487 right_count += 1;
488 }
489 while left_iter.is_valid() {
490 left_count += 1;
491 left_iter.next().await?;
492 }
493 while right_iter.is_valid() {
494 right_count += 1;
495 right_iter.next().await?;
496 }
497 if left_count != right_count {
498 tracing::error!(
499 "The key count of input and output not equal: {} vs {}",
500 left_count,
501 right_count
502 );
503 return Ok(false);
504 }
505 Ok(true)
506}
507
508pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
509 let input_ssts = compact_task.read_input_ssts().collect_vec();
510 let compaction_size = input_ssts_size(&input_ssts);
511 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
512}
513
514fn optimize_by_copy_block_with_input(
515 compact_task: &CompactTask,
516 context: &CompactorContext,
517 input_ssts: &[&SstableInfo],
518 compaction_size: u64,
519) -> bool {
520 let all_ssts_are_blocked_filter = input_ssts
521 .iter()
522 .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
523
524 let delete_key_count = input_ssts
525 .iter()
526 .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
527 .sum::<u64>();
528 let total_key_count = input_ssts
529 .iter()
530 .map(|table_info| table_info.total_key_count)
531 .sum::<u64>();
532
533 let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
534 context.storage_opts.enable_fast_compaction
535 && all_ssts_are_blocked_filter
536 && !compact_task.contains_range_tombstone()
537 && !compact_task.contains_ttl()
538 && !compact_task.contains_split_sst()
539 && single_table
540 && compact_task.target_level > 0
541 && compact_task.input_ssts.len() == 2
542 && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
543 && delete_key_count * 100
544 < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
545 && compact_task.task_type == PbTaskType::Dynamic
546}
547
548pub async fn generate_splits_for_task(
549 compact_task: &mut CompactTask,
550 context: &CompactorContext,
551 optimize_by_copy_block: bool,
552) -> HummockResult<()> {
553 let input_ssts = compact_task.read_input_ssts().collect_vec();
554 let compaction_size = input_ssts_size(&input_ssts);
555
556 if !optimize_by_copy_block {
557 let splits = generate_splits(
558 &input_ssts,
559 compaction_size,
560 context,
561 compact_task.max_sub_compaction,
562 )
563 .await?;
564 if !splits.is_empty() {
565 compact_task.splits = splits;
566 }
567 return Ok(());
568 }
569
570 Ok(())
571}
572
573pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
574 let group_label = compact_task.compaction_group_id.to_string();
575 let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
576
577 let (select_size, select_count) = read_sstable_size_and_count(
578 compact_task
579 .input_ssts
580 .iter()
581 .filter(|level| level.level_idx != compact_task.target_level)
582 .flat_map(|level| level.read_sstable_infos()),
583 );
584 let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
585 compact_task
586 .input_ssts
587 .iter()
588 .filter(|level| level.level_idx == compact_task.target_level)
589 .flat_map(|level| level.read_sstable_infos()),
590 );
591
592 context
593 .compactor_metrics
594 .compact_read_current_level
595 .with_label_values(&[&group_label, &cur_level_label])
596 .inc_by(select_size);
597 context
598 .compactor_metrics
599 .compact_read_sstn_current_level
600 .with_label_values(&[&group_label, &cur_level_label])
601 .inc_by(select_count as u64);
602
603 let next_level_label = compact_task.target_level.to_string();
604 context
605 .compactor_metrics
606 .compact_read_next_level
607 .with_label_values(&[&group_label, &next_level_label])
608 .inc_by(target_level_read_bytes);
609 context
610 .compactor_metrics
611 .compact_read_sstn_next_level
612 .with_label_values(&[&group_label, &next_level_label])
613 .inc_by(target_count as u64);
614}
615
616fn read_sstable_size_and_count<'a>(
617 sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
618) -> (u64, usize) {
619 sstable_infos
620 .into_iter()
621 .fold((0, 0), |(size, count), table_info| {
622 (size + table_info.sst_size, count + 1)
623 })
624}
625
626pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
627 let input_ssts = compact_task.read_input_ssts().collect_vec();
628 let compaction_size = input_ssts_size(&input_ssts);
629 let optimize_by_copy_block =
630 optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
631
632 if optimize_by_copy_block {
633 return 1;
634 }
635
636 let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
637 calculate_task_parallelism_impl(
638 context.compaction_executor.worker_num(),
639 parallel_compact_size,
640 compaction_size,
641 compact_task.max_sub_compaction,
642 )
643}
644
645fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
646 input_ssts
647 .iter()
648 .map(|table_info| table_info.sst_size)
649 .sum()
650}
651
652pub fn calculate_task_parallelism_impl(
653 worker_num: usize,
654 parallel_compact_size: u64,
655 compaction_size: u64,
656 max_sub_compaction: u32,
657) -> usize {
658 let parallelism = compaction_size.div_ceil(parallel_compact_size);
659 worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
660}