1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{LevelType, PbSstableFilterType};
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 blocked_xor_filter_key_count_threshold, build_multi_compaction_filter,
48 estimate_output_key_count_for_task, estimate_task_output_capacity, generate_splits_for_task,
49 metrics_report_for_task, optimize_by_copy_block,
50};
51use crate::hummock::compactor::iterator::ConcatSstableIterator;
52use crate::hummock::compactor::task_progress::TaskProgressGuard;
53use crate::hummock::compactor::{
54 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
55 fast_compactor_runner,
56};
57use crate::hummock::iterator::{
58 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
59 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
60 ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
61};
62use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
63use crate::hummock::utils::MemoryTracker;
64use crate::hummock::value::HummockValue;
65use crate::hummock::{
66 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm,
67 GetObjectId, HummockError, HummockResult, SstableBuilderOptions, SstableStoreRef,
68};
69use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
70pub struct CompactorRunner {
71 compact_task: CompactTask,
72 compactor: Compactor,
73 sstable_store: SstableStoreRef,
74 key_range: KeyRange,
75 split_index: usize,
76}
77
78impl CompactorRunner {
79 pub fn new(
80 split_index: usize,
81 context: CompactorContext,
82 task: CompactTask,
83 object_id_getter: Arc<dyn GetObjectId>,
84 ) -> Self {
85 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
86 options.compression_algorithm = match task.compression_algorithm {
87 0 => CompressionAlgorithm::None,
88 1 => CompressionAlgorithm::Lz4,
89 _ => CompressionAlgorithm::Zstd,
90 };
91
92 options.capacity = estimate_task_output_capacity(context.clone(), &task);
93 let estimated_output_key_count =
94 estimate_output_key_count_for_task(&task, options.capacity);
95 options.estimated_output_key_count = Some(estimated_output_key_count);
96 options.filter_hash_prealloc_key_count_cap =
97 blocked_xor_filter_key_count_threshold(task.blocked_xor_filter_kv_count_threshold);
98 options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
99 let sstable_filter_layout =
100 task.sstable_filter_layout_for_output(estimated_output_key_count as u64);
101
102 let key_range = KeyRange {
103 left: task.splits[split_index].left.clone(),
104 right: task.splits[split_index].right.clone(),
105 right_exclusive: true,
106 };
107 let compactor = Compactor::new(
108 context.clone(),
109 options,
110 TaskConfig {
111 key_range: key_range.clone(),
112 cache_policy: CachePolicy::NotFill,
113 gc_delete_keys: task.gc_delete_keys,
114 retain_multiple_version: false,
115 sstable_filter_layout,
116 sstable_filter_type: task.sstable_filter_type,
117 table_vnode_partition: task.table_vnode_partition.clone(),
118 table_schemas: task
119 .table_schemas
120 .iter()
121 .map(|(k, v)| (*k, v.clone()))
122 .collect(),
123 disable_drop_column_optimization: false,
124 },
125 object_id_getter,
126 );
127
128 Self {
129 compactor,
130 compact_task: task,
131 sstable_store: context.sstable_store,
132 key_range,
133 split_index,
134 }
135 }
136
137 pub async fn run(
138 &self,
139 compaction_filter: impl CompactionFilter,
140 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141 task_progress: Arc<TaskProgress>,
142 ) -> HummockResult<CompactOutput> {
143 let iter =
144 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145 let (ssts, compaction_stat) = self
146 .compactor
147 .compact_key_range(
148 iter,
149 compaction_filter,
150 compaction_catalog_agent_ref,
151 Some(task_progress),
152 Some(self.compact_task.task_id),
153 Some(self.split_index),
154 )
155 .await?;
156 Ok((self.split_index, ssts, compaction_stat))
157 }
158
159 fn build_sst_iter(
161 &self,
162 task_progress: Arc<TaskProgress>,
163 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165 let compactor_iter_max_io_retry_times = self
166 .compactor
167 .context
168 .storage_opts
169 .compactor_iter_max_io_retry_times;
170 let mut table_iters = Vec::new();
171
172 for level in &self.compact_task.input_ssts {
173 let tables = level
174 .read_sstable_infos()
175 .filter(|table_info| self.key_range.full_key_overlap(&table_info.key_range))
176 .cloned()
177 .collect_vec();
178 if tables.is_empty() {
179 continue;
180 }
181
182 if level.level_type == LevelType::Nonoverlapping {
183 debug_assert!(can_concat(&tables));
184 table_iters.push(ConcatSstableIterator::new(
185 tables,
186 self.compactor.task_config.key_range.clone(),
187 self.sstable_store.clone(),
188 task_progress.clone(),
189 compactor_iter_max_io_retry_times,
190 ));
191 } else if tables.len()
192 > self
193 .compactor
194 .context
195 .storage_opts
196 .compactor_max_overlap_sst_count
197 {
198 let sst_groups = partition_overlapping_sstable_infos(tables);
199 tracing::warn!(
200 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
201 sst_groups.iter().map(Vec::len).sum::<usize>(),
202 sst_groups.len()
203 );
204 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
205 assert!(
207 full_key_can_concat(&table_infos),
208 "sst_group idx {:?} table_infos: {:?}",
209 idx,
210 table_infos
211 );
212 table_iters.push(ConcatSstableIterator::new(
213 table_infos,
214 self.compactor.task_config.key_range.clone(),
215 self.sstable_store.clone(),
216 task_progress.clone(),
217 compactor_iter_max_io_retry_times,
218 ));
219 }
220 } else {
221 for table_info in tables {
222 table_iters.push(ConcatSstableIterator::new(
223 vec![table_info],
224 self.compactor.task_config.key_range.clone(),
225 self.sstable_store.clone(),
226 task_progress.clone(),
227 compactor_iter_max_io_retry_times,
228 ));
229 }
230 }
231 }
232
233 let combine_iter = {
238 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
239 MonitoredCompactorIterator::new(
240 MergeIterator::for_compactor(table_iters),
241 task_progress,
242 ),
243 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
244 self.compact_task.pk_prefix_table_watermarks.clone(),
245 ),
246 );
247
248 let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
249 skip_watermark_iter,
250 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
251 self.compact_task.non_pk_prefix_table_watermarks.clone(),
252 compaction_catalog_agent_ref.clone(),
253 ),
254 );
255
256 ValueSkipWatermarkIterator::new(
257 pk_skip_watermark_iter,
258 ValueSkipWatermarkState::from_safe_epoch_watermarks(
259 self.compact_task.value_table_watermarks.clone(),
260 compaction_catalog_agent_ref,
261 ),
262 )
263 };
264
265 Ok(combine_iter)
266 }
267}
268
269pub fn partition_overlapping_sstable_infos(
270 mut origin_infos: Vec<SstableInfo>,
271) -> Vec<Vec<SstableInfo>> {
272 pub struct SstableGroup {
273 ssts: Vec<SstableInfo>,
274 max_right_bound: Bytes,
275 }
276
277 impl PartialEq for SstableGroup {
278 fn eq(&self, other: &SstableGroup) -> bool {
279 self.max_right_bound == other.max_right_bound
280 }
281 }
282 impl PartialOrd for SstableGroup {
283 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
284 Some(self.cmp(other))
285 }
286 }
287 impl Eq for SstableGroup {}
288 impl Ord for SstableGroup {
289 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
290 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
292 }
293 }
294 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
295 origin_infos.sort_by(|a, b| {
296 let x = &a.key_range;
297 let y = &b.key_range;
298 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
299 });
300 for sst in origin_infos {
301 if let Some(mut prev_group) = groups.peek_mut()
303 && KeyComparator::encoded_full_key_less_than(
304 &prev_group.max_right_bound,
305 &sst.key_range.left,
306 )
307 {
308 prev_group.max_right_bound.clone_from(&sst.key_range.right);
309 prev_group.ssts.push(sst);
310 continue;
311 }
312 groups.push(SstableGroup {
313 max_right_bound: sst.key_range.right.clone(),
314 ssts: vec![sst],
315 });
316 }
317 assert!(!groups.is_empty());
318 groups.into_iter().map(|group| group.ssts).collect_vec()
319}
320
321pub async fn compact_with_agent(
324 compactor_context: CompactorContext,
325 mut compact_task: CompactTask,
326 mut shutdown_rx: Receiver<()>,
327 object_id_getter: Arc<dyn GetObjectId>,
328 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329) -> (
330 (
331 CompactTask,
332 HashMap<TableId, TableStats>,
333 HashMap<HummockSstableObjectId, u64>,
334 ),
335 Option<MemoryTracker>,
336) {
337 let context = compactor_context.clone();
338 let group_label = compact_task.compaction_group_id.to_string();
339 metrics_report_for_task(&compact_task, &context);
340
341 let timer = context
342 .compactor_metrics
343 .compact_task_duration
344 .with_label_values(&[
345 &group_label,
346 &compact_task.input_ssts[0].level_idx.to_string(),
347 ])
348 .start_timer();
349
350 let multi_filter = build_multi_compaction_filter(&compact_task);
351 let mut task_status = TaskStatus::Success;
352 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
353
354 if let Err(e) =
355 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
356 {
357 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
358 task_status = TaskStatus::ExecuteFailed;
359 return (
360 compact_done(compact_task, context.clone(), vec![], task_status),
361 None,
362 );
363 }
364
365 let compact_task_statistics = statistics_compact_task(&compact_task);
366 let parallelism = compact_task.splits.len();
368 assert_ne!(parallelism, 0, "splits cannot be empty");
369 let mut output_ssts = Vec::with_capacity(parallelism);
370 let mut compaction_futures = vec![];
371 let mut abort_handles = vec![];
372 let task_progress_guard =
373 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
374
375 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
376
377 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
378 &compact_task,
379 (context.storage_opts.block_size_kb as u64) * (1 << 10),
380 context
381 .storage_opts
382 .object_store_config
383 .s3
384 .recv_buffer_size
385 .unwrap_or(6 * 1024 * 1024) as u64,
386 capacity as u64,
387 ) * compact_task.splits.len() as u64;
388
389 tracing::info!(
390 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
391 compact_task.task_id,
392 compact_task_statistics,
393 compact_task.compression_algorithm,
394 parallelism,
395 task_memory_capacity_with_parallelism,
396 optimize_by_copy_block,
397 compact_task_to_string(&compact_task),
398 );
399
400 let memory_detector = context
403 .memory_limiter
404 .try_require_memory(task_memory_capacity_with_parallelism);
405 if memory_detector.is_none() {
406 tracing::warn!(
407 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
408 compact_task.task_id,
409 task_memory_capacity_with_parallelism,
410 context.memory_limiter.get_memory_usage(),
411 context.memory_limiter.quota()
412 );
413 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
414 return (
415 compact_done(compact_task, context.clone(), output_ssts, task_status),
416 memory_detector,
417 );
418 }
419
420 context.compactor_metrics.compact_task_pending_num.inc();
421 context
422 .compactor_metrics
423 .compact_task_pending_parallelism
424 .add(parallelism as _);
425 let _release_metrics_guard =
426 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
427 context.compactor_metrics.compact_task_pending_num.dec();
428 context
429 .compactor_metrics
430 .compact_task_pending_parallelism
431 .sub(parallelism as _);
432 });
433
434 if optimize_by_copy_block {
435 let fast_compaction = {
436 let context = context.clone();
437 let task = compact_task.clone();
438 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
439 let object_id_getter = object_id_getter.clone();
440 let task_progress = task_progress_guard.progress.clone();
441
442 async move {
443 match task.sstable_filter_type {
444 PbSstableFilterType::SstableFilterXor8 => {
445 fast_compactor_runner::CompactorRunner::<BlockedXor8FilterBuilder, _>::new(
446 context,
447 task,
448 compaction_catalog_agent_ref,
449 object_id_getter,
450 task_progress,
451 multi_filter,
452 )
453 .run()
454 .await
455 }
456 PbSstableFilterType::SstableFilterXor16 => {
457 fast_compactor_runner::CompactorRunner::<BlockedXor16FilterBuilder, _>::new(
458 context,
459 task,
460 compaction_catalog_agent_ref,
461 object_id_getter,
462 task_progress,
463 multi_filter,
464 )
465 .run()
466 .await
467 }
468 filter_type => Err(HummockError::compaction_executor(format!(
469 "fast compaction only supports blocked xor filters, got {:?}",
470 filter_type
471 ))),
472 }
473 }
474 };
475
476 tokio::select! {
477 _ = &mut shutdown_rx => {
478 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
479 task_status = TaskStatus::ManualCanceled;
480 },
481
482 ret = fast_compaction => {
483 match ret {
484 Ok((ssts, statistics)) => {
485 output_ssts.push((0, ssts, statistics));
486 }
487 Err(e) => {
488 task_status = TaskStatus::ExecuteFailed;
489 tracing::warn!(
490 error = %e.as_report(),
491 "Compaction task {} failed with error",
492 compact_task.task_id,
493 );
494 }
495 }
496 }
497 }
498
499 let (compact_task, table_stats, object_timestamps) =
501 compact_done(compact_task, context.clone(), output_ssts, task_status);
502 let cost_time = timer.stop_and_record() * 1000.0;
503 tracing::info!(
504 "Finished fast compaction task in {:?}ms: {}",
505 cost_time,
506 compact_task_to_string(&compact_task)
507 );
508 return (
509 (compact_task, table_stats, object_timestamps),
510 memory_detector,
511 );
512 }
513 for (split_index, _) in compact_task.splits.iter().enumerate() {
514 let filter = multi_filter.clone();
515 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
516 let compactor_runner = CompactorRunner::new(
517 split_index,
518 compactor_context.clone(),
519 compact_task.clone(),
520 object_id_getter.clone(),
521 );
522 let task_progress = task_progress_guard.progress.clone();
523 let runner = async move {
524 compactor_runner
525 .run(filter, compaction_catalog_agent_ref, task_progress)
526 .await
527 };
528 let traced = match context.await_tree_reg.as_ref() {
529 None => runner.right_future(),
530 Some(await_tree_reg) => await_tree_reg
531 .register(
532 await_tree_key::CompactRunner {
533 task_id: compact_task.task_id,
534 split_index,
535 },
536 format!(
537 "Compaction Task {} Split {} ",
538 compact_task.task_id, split_index
539 ),
540 )
541 .instrument(runner)
542 .left_future(),
543 };
544 let handle = tokio::spawn(traced);
545 abort_handles.push(handle.abort_handle());
546 compaction_futures.push(handle);
547 }
548
549 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
550 loop {
551 tokio::select! {
552 _ = &mut shutdown_rx => {
553 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
554 task_status = TaskStatus::ManualCanceled;
555 break;
556 }
557 future_result = buffered.next() => {
558 match future_result {
559 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
560 output_ssts.push((split_index, ssts, compact_stat));
561 }
562 Some(Ok(Err(e))) => {
563 task_status = TaskStatus::ExecuteFailed;
564 tracing::warn!(
565 error = %e.as_report(),
566 "Compaction task {} failed with error",
567 compact_task.task_id,
568 );
569 break;
570 }
571 Some(Err(e)) => {
572 task_status = TaskStatus::JoinHandleFailed;
573 tracing::warn!(
574 error = %e.as_report(),
575 "Compaction task {} failed with join handle error",
576 compact_task.task_id,
577 );
578 break;
579 }
580 None => break,
581 }
582 }
583 }
584 }
585
586 if task_status != TaskStatus::Success {
587 for abort_handle in abort_handles {
588 abort_handle.abort();
589 }
590 output_ssts.clear();
591 }
592 if !output_ssts.is_empty() {
594 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
595 }
596
597 let (compact_task, table_stats, object_timestamps) =
599 compact_done(compact_task, context.clone(), output_ssts, task_status);
600 let cost_time = timer.stop_and_record() * 1000.0;
601 tracing::info!(
602 "Finished compaction task in {:?}ms: {}",
603 cost_time,
604 compact_task_output_to_string(&compact_task)
605 );
606 (
607 (compact_task, table_stats, object_timestamps),
608 memory_detector,
609 )
610}
611
612pub(crate) async fn acquire_complete_catalog_agent(
613 compaction_catalog_manager_ref: &CompactionCatalogManagerRef,
614 read_table_ids: Vec<StateTableId>,
615) -> HummockResult<CompactionCatalogAgentRef> {
616 let expected_table_ids = read_table_ids.iter().copied().collect::<HashSet<_>>();
617 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
618 .acquire(read_table_ids)
619 .await?;
620 let acquired_table_ids = compaction_catalog_agent_ref
621 .table_ids()
622 .collect::<HashSet<_>>();
623
624 if acquired_table_ids != expected_table_ids {
625 let diff = expected_table_ids
626 .symmetric_difference(&acquired_table_ids)
627 .cloned()
628 .collect::<Vec<_>>();
629 return Err(HummockError::other(format!(
630 "some table ids are not acquired: {:?}",
631 diff
632 )));
633 }
634
635 Ok(compaction_catalog_agent_ref)
636}
637
638pub async fn compact(
641 compactor_context: CompactorContext,
642 compact_task: CompactTask,
643 shutdown_rx: Receiver<()>,
644 object_id_getter: Arc<dyn GetObjectId>,
645 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
646) -> (
647 (
648 CompactTask,
649 HashMap<TableId, TableStats>,
650 HashMap<HummockSstableObjectId, u64>,
651 ),
652 Option<MemoryTracker>,
653) {
654 let read_table_ids = compact_task.get_table_ids_from_input_ssts().collect_vec();
655 let compaction_catalog_agent_ref =
656 match acquire_complete_catalog_agent(&compaction_catalog_manager_ref, read_table_ids).await
657 {
658 Ok(compaction_catalog_agent_ref) => compaction_catalog_agent_ref,
659 Err(e) => {
660 tracing::warn!(
661 error = %e.as_report(),
662 "Failed to acquire complete compaction catalog agent"
663 );
664 return (
665 compact_done(
666 compact_task,
667 compactor_context.clone(),
668 vec![],
669 TaskStatus::ExecuteFailed,
670 ),
671 None,
672 );
673 }
674 };
675
676 compact_with_agent(
677 compactor_context,
678 compact_task,
679 shutdown_rx,
680 object_id_getter,
681 compaction_catalog_agent_ref,
682 )
683 .await
684}
685
686pub(crate) fn compact_done(
688 mut compact_task: CompactTask,
689 context: CompactorContext,
690 output_ssts: Vec<CompactOutput>,
691 task_status: TaskStatus,
692) -> (
693 CompactTask,
694 HashMap<TableId, TableStats>,
695 HashMap<HummockSstableObjectId, u64>,
696) {
697 let mut table_stats_map = TableStatsMap::default();
698 let mut object_timestamps = HashMap::default();
699 compact_task.task_status = task_status;
700 compact_task
701 .sorted_output_ssts
702 .reserve(compact_task.splits.len());
703 let mut compaction_write_bytes = 0;
704 for (
705 _,
706 ssts,
707 CompactionStatistics {
708 delta_drop_stat, ..
709 },
710 ) in output_ssts
711 {
712 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
713 for sst_info in ssts {
714 compaction_write_bytes += sst_info.file_size();
715 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
716 compact_task.sorted_output_ssts.push(sst_info.sst_info);
717 }
718 }
719 assert_output_table_ids_subset_of_read_input(&compact_task);
720
721 let group_label = compact_task.compaction_group_id.to_string();
722 let level_label = compact_task.target_level.to_string();
723 context
724 .compactor_metrics
725 .compact_write_bytes
726 .with_label_values(&[&group_label, &level_label])
727 .inc_by(compaction_write_bytes);
728 context
729 .compactor_metrics
730 .compact_write_sstn
731 .with_label_values(&[&group_label, &level_label])
732 .inc_by(compact_task.sorted_output_ssts.len() as u64);
733
734 (compact_task, table_stats_map, object_timestamps)
735}
736
737fn assert_output_table_ids_subset_of_read_input(compact_task: &CompactTask) {
739 let read_table_ids = compact_task
740 .get_table_ids_from_input_ssts()
741 .collect::<HashSet<_>>();
742
743 for output_sst in &compact_task.sorted_output_ssts {
744 assert!(
745 output_sst
746 .table_ids
747 .iter()
748 .all(|table_id| read_table_ids.contains(table_id)),
749 "compaction output contains table ids outside task read input: task_id={}, output_sst_id={}, read_table_ids={:?}, output_table_ids={:?}",
750 compact_task.task_id,
751 output_sst.sst_id,
752 read_table_ids,
753 output_sst.table_ids,
754 );
755 }
756}
757
758pub async fn compact_and_build_sst<F>(
759 sst_builder: &mut CapacitySplitTableBuilder<F>,
760 task_config: &TaskConfig,
761 compactor_metrics: Arc<CompactorMetrics>,
762 mut iter: impl HummockIterator<Direction = Forward>,
763 mut compaction_filter: impl CompactionFilter,
764) -> HummockResult<CompactionStatistics>
765where
766 F: TableBuilderFactory,
767{
768 if !task_config.key_range.left.is_empty() {
769 let full_key = FullKey::decode(&task_config.key_range.left);
770 iter.seek(full_key)
771 .instrument_await("iter_seek".verbose())
772 .await?;
773 } else {
774 iter.rewind().instrument_await("rewind".verbose()).await?;
775 };
776
777 let end_key = if task_config.key_range.right.is_empty() {
778 FullKey::default()
779 } else {
780 FullKey::decode(&task_config.key_range.right).to_vec()
781 };
782 let max_key = end_key.to_ref();
783
784 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
785 let mut local_stats = StoreLocalStatistic::default();
786
787 let mut table_stats_drop = TableStatsMap::default();
789 let mut last_table_stats = TableStats::default();
790 let mut last_table_id = None;
791 let mut compaction_statistics = CompactionStatistics::default();
792 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
794 let schemas: HashMap<TableId, HashSet<i32>> = task_config
795 .table_schemas
796 .iter()
797 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
798 .collect();
799 while iter.is_valid() {
800 let iter_key = iter.key();
801 compaction_statistics.iter_total_key_counts += 1;
802
803 let is_new_user_key = full_key_tracker.observe(iter.key());
804 let mut drop = false;
805
806 let value = iter.value();
808 let ValueMeta {
809 object_id,
810 block_id,
811 } = iter.value_meta();
812 if is_new_user_key {
813 if !max_key.is_empty() && iter_key >= max_key {
814 break;
815 }
816 if value.is_delete() {
817 local_stats.skip_delete_key_count += 1;
818 }
819 } else {
820 local_stats.skip_multi_version_key_count += 1;
821 }
822
823 if last_table_id != Some(iter_key.user_key.table_id) {
824 if let Some(last_table_id) = last_table_id.take() {
825 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
826 }
827 last_table_id = Some(iter_key.user_key.table_id);
828 }
829
830 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
837 || (!task_config.retain_multiple_version && !is_new_user_key)
838 {
839 drop = true;
840 }
841
842 if !drop && compaction_filter.should_delete(iter_key) {
843 drop = true;
844 }
845
846 if drop {
847 compaction_statistics.iter_drop_key_counts += 1;
848
849 last_table_stats.total_key_count -= 1;
850 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
851 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
852 iter.next()
853 .instrument_await("iter_next_in_drop".verbose())
854 .await?;
855 continue;
856 }
857
858 let check_table_id = iter_key.user_key.table_id;
860 let mut is_value_rewritten = false;
861 if let HummockValue::Put(v) = value
862 && let Some(object_id) = object_id
863 && let Some(block_id) = block_id
864 && !skip_schema_check
865 .get(&object_id)
866 .map(|prev_block_id| {
867 assert!(*prev_block_id <= block_id);
868 *prev_block_id == block_id
869 })
870 .unwrap_or(false)
871 && let Some(schema) = schemas.get(&check_table_id)
872 {
873 let value_size = v.len();
874 match try_drop_invalid_columns(v, schema) {
875 None => {
876 if !task_config.disable_drop_column_optimization {
877 skip_schema_check.insert(object_id, block_id);
880 }
881 }
882 Some(new_value) => {
883 is_value_rewritten = true;
884 let new_put = HummockValue::put(new_value.as_slice());
885 sst_builder
886 .add_full_key(iter_key, new_put, is_new_user_key)
887 .instrument_await("add_rewritten_full_key".verbose())
888 .await?;
889 let value_size_change = value_size as i64 - new_value.len() as i64;
890 assert!(value_size_change >= 0);
891 last_table_stats.total_value_size -= value_size_change;
892 }
893 }
894 }
895
896 if !is_value_rewritten {
897 sst_builder
899 .add_full_key(iter_key, value, is_new_user_key)
900 .instrument_await("add_full_key".verbose())
901 .await?;
902 }
903
904 iter.next().instrument_await("iter_next".verbose()).await?;
905 }
906
907 if let Some(last_table_id) = last_table_id.take() {
908 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
909 }
910 iter.collect_local_statistic(&mut local_stats);
911 add_table_stats_map(
912 &mut table_stats_drop,
913 &local_stats.skipped_by_watermark_table_stats,
914 );
915 local_stats.report_compactor(compactor_metrics.as_ref());
916 compaction_statistics.delta_drop_stat = table_stats_drop;
917
918 Ok(compaction_statistics)
919}
920
921#[cfg(test)]
922pub mod tests {
923 use risingwave_hummock_sdk::can_concat;
924
925 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
926 use crate::hummock::iterator::test_utils::mock_sstable_store;
927 use crate::hummock::test_utils::{
928 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
929 };
930 use crate::hummock::value::HummockValue;
931
932 #[tokio::test]
933 async fn test_partition_overlapping_level() {
934 const TEST_KEYS_COUNT: usize = 10;
935 let sstable_store = mock_sstable_store().await;
936 let mut table_infos = vec![];
937 for object_id in 0..10 {
938 let start_index = object_id * TEST_KEYS_COUNT;
939 let end_index = start_index + 2 * TEST_KEYS_COUNT;
940 let table_info = gen_test_sstable_info(
941 default_builder_opt_for_test(),
942 object_id as u64,
943 (start_index..end_index)
944 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
945 sstable_store.clone(),
946 )
947 .await;
948 table_infos.push(table_info);
949 }
950 let table_infos = partition_overlapping_sstable_infos(table_infos);
951 assert_eq!(table_infos.len(), 2);
952 for ssts in table_infos {
953 assert!(can_concat(&ssts));
954 }
955 }
956}