Skip to main content

risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{LevelType, PbSstableFilterType};
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    blocked_xor_filter_key_count_threshold, build_multi_compaction_filter,
48    estimate_output_key_count_for_task, estimate_task_output_capacity, generate_splits_for_task,
49    metrics_report_for_task, optimize_by_copy_block,
50};
51use crate::hummock::compactor::iterator::ConcatSstableIterator;
52use crate::hummock::compactor::task_progress::TaskProgressGuard;
53use crate::hummock::compactor::{
54    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
55    fast_compactor_runner,
56};
57use crate::hummock::iterator::{
58    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
59    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
60    ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
61};
62use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
63use crate::hummock::utils::MemoryTracker;
64use crate::hummock::value::HummockValue;
65use crate::hummock::{
66    BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm,
67    GetObjectId, HummockError, HummockResult, SstableBuilderOptions, SstableStoreRef,
68};
69use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
70pub struct CompactorRunner {
71    compact_task: CompactTask,
72    compactor: Compactor,
73    sstable_store: SstableStoreRef,
74    key_range: KeyRange,
75    split_index: usize,
76}
77
78impl CompactorRunner {
79    pub fn new(
80        split_index: usize,
81        context: CompactorContext,
82        task: CompactTask,
83        object_id_getter: Arc<dyn GetObjectId>,
84    ) -> Self {
85        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
86        options.compression_algorithm = match task.compression_algorithm {
87            0 => CompressionAlgorithm::None,
88            1 => CompressionAlgorithm::Lz4,
89            _ => CompressionAlgorithm::Zstd,
90        };
91
92        options.capacity = estimate_task_output_capacity(context.clone(), &task);
93        let estimated_output_key_count =
94            estimate_output_key_count_for_task(&task, options.capacity);
95        options.estimated_output_key_count = Some(estimated_output_key_count);
96        options.filter_hash_prealloc_key_count_cap =
97            blocked_xor_filter_key_count_threshold(task.blocked_xor_filter_kv_count_threshold);
98        options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
99        let sstable_filter_layout =
100            task.sstable_filter_layout_for_output(estimated_output_key_count as u64);
101
102        let key_range = KeyRange {
103            left: task.splits[split_index].left.clone(),
104            right: task.splits[split_index].right.clone(),
105            right_exclusive: true,
106        };
107        let compactor = Compactor::new(
108            context.clone(),
109            options,
110            TaskConfig {
111                key_range: key_range.clone(),
112                cache_policy: CachePolicy::NotFill,
113                gc_delete_keys: task.gc_delete_keys,
114                retain_multiple_version: false,
115                sstable_filter_layout,
116                sstable_filter_type: task.sstable_filter_type,
117                table_vnode_partition: task.table_vnode_partition.clone(),
118                table_schemas: task
119                    .table_schemas
120                    .iter()
121                    .map(|(k, v)| (*k, v.clone()))
122                    .collect(),
123                disable_drop_column_optimization: false,
124            },
125            object_id_getter,
126        );
127
128        Self {
129            compactor,
130            compact_task: task,
131            sstable_store: context.sstable_store,
132            key_range,
133            split_index,
134        }
135    }
136
137    pub async fn run(
138        &self,
139        compaction_filter: impl CompactionFilter,
140        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141        task_progress: Arc<TaskProgress>,
142    ) -> HummockResult<CompactOutput> {
143        let iter =
144            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145        let (ssts, compaction_stat) = self
146            .compactor
147            .compact_key_range(
148                iter,
149                compaction_filter,
150                compaction_catalog_agent_ref,
151                Some(task_progress),
152                Some(self.compact_task.task_id),
153                Some(self.split_index),
154            )
155            .await?;
156        Ok((self.split_index, ssts, compaction_stat))
157    }
158
159    /// Build the merge iterator based on the given input ssts.
160    fn build_sst_iter(
161        &self,
162        task_progress: Arc<TaskProgress>,
163        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165        let compactor_iter_max_io_retry_times = self
166            .compactor
167            .context
168            .storage_opts
169            .compactor_iter_max_io_retry_times;
170        let mut table_iters = Vec::new();
171
172        for level in &self.compact_task.input_ssts {
173            let tables = level
174                .read_sstable_infos()
175                .filter(|table_info| self.key_range.full_key_overlap(&table_info.key_range))
176                .cloned()
177                .collect_vec();
178            if tables.is_empty() {
179                continue;
180            }
181
182            if level.level_type == LevelType::Nonoverlapping {
183                debug_assert!(can_concat(&tables));
184                table_iters.push(ConcatSstableIterator::new(
185                    tables,
186                    self.compactor.task_config.key_range.clone(),
187                    self.sstable_store.clone(),
188                    task_progress.clone(),
189                    compactor_iter_max_io_retry_times,
190                ));
191            } else if tables.len()
192                > self
193                    .compactor
194                    .context
195                    .storage_opts
196                    .compactor_max_overlap_sst_count
197            {
198                let sst_groups = partition_overlapping_sstable_infos(tables);
199                tracing::warn!(
200                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
201                    sst_groups.iter().map(Vec::len).sum::<usize>(),
202                    sst_groups.len()
203                );
204                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
205                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
206                    assert!(
207                        full_key_can_concat(&table_infos),
208                        "sst_group idx {:?} table_infos: {:?}",
209                        idx,
210                        table_infos
211                    );
212                    table_iters.push(ConcatSstableIterator::new(
213                        table_infos,
214                        self.compactor.task_config.key_range.clone(),
215                        self.sstable_store.clone(),
216                        task_progress.clone(),
217                        compactor_iter_max_io_retry_times,
218                    ));
219                }
220            } else {
221                for table_info in tables {
222                    table_iters.push(ConcatSstableIterator::new(
223                        vec![table_info],
224                        self.compactor.task_config.key_range.clone(),
225                        self.sstable_store.clone(),
226                        task_progress.clone(),
227                        compactor_iter_max_io_retry_times,
228                    ));
229                }
230            }
231        }
232
233        // // The `SkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
234        // // in https://github.com/risingwavelabs/risingwave/issues/13148
235        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
236        // in https://github.com/risingwavelabs/risingwave/issues/13148
237        let combine_iter = {
238            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
239                MonitoredCompactorIterator::new(
240                    MergeIterator::for_compactor(table_iters),
241                    task_progress,
242                ),
243                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
244                    self.compact_task.pk_prefix_table_watermarks.clone(),
245                ),
246            );
247
248            let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
249                skip_watermark_iter,
250                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
251                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
252                    compaction_catalog_agent_ref.clone(),
253                ),
254            );
255
256            ValueSkipWatermarkIterator::new(
257                pk_skip_watermark_iter,
258                ValueSkipWatermarkState::from_safe_epoch_watermarks(
259                    self.compact_task.value_table_watermarks.clone(),
260                    compaction_catalog_agent_ref,
261                ),
262            )
263        };
264
265        Ok(combine_iter)
266    }
267}
268
269pub fn partition_overlapping_sstable_infos(
270    mut origin_infos: Vec<SstableInfo>,
271) -> Vec<Vec<SstableInfo>> {
272    pub struct SstableGroup {
273        ssts: Vec<SstableInfo>,
274        max_right_bound: Bytes,
275    }
276
277    impl PartialEq for SstableGroup {
278        fn eq(&self, other: &SstableGroup) -> bool {
279            self.max_right_bound == other.max_right_bound
280        }
281    }
282    impl PartialOrd for SstableGroup {
283        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
284            Some(self.cmp(other))
285        }
286    }
287    impl Eq for SstableGroup {}
288    impl Ord for SstableGroup {
289        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
290            // Pick group with the smallest right bound for every new sstable.
291            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
292        }
293    }
294    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
295    origin_infos.sort_by(|a, b| {
296        let x = &a.key_range;
297        let y = &b.key_range;
298        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
299    });
300    for sst in origin_infos {
301        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
302        if let Some(mut prev_group) = groups.peek_mut()
303            && KeyComparator::encoded_full_key_less_than(
304                &prev_group.max_right_bound,
305                &sst.key_range.left,
306            )
307        {
308            prev_group.max_right_bound.clone_from(&sst.key_range.right);
309            prev_group.ssts.push(sst);
310            continue;
311        }
312        groups.push(SstableGroup {
313            max_right_bound: sst.key_range.right.clone(),
314            ssts: vec![sst],
315        });
316    }
317    assert!(!groups.is_empty());
318    groups.into_iter().map(|group| group.ssts).collect_vec()
319}
320
321/// Handles a compaction task and reports its status to hummock manager.
322/// Always return `Ok` and let hummock manager handle errors.
323pub async fn compact_with_agent(
324    compactor_context: CompactorContext,
325    mut compact_task: CompactTask,
326    mut shutdown_rx: Receiver<()>,
327    object_id_getter: Arc<dyn GetObjectId>,
328    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329) -> (
330    (
331        CompactTask,
332        HashMap<TableId, TableStats>,
333        HashMap<HummockSstableObjectId, u64>,
334    ),
335    Option<MemoryTracker>,
336) {
337    let context = compactor_context.clone();
338    let group_label = compact_task.compaction_group_id.to_string();
339    metrics_report_for_task(&compact_task, &context);
340
341    let timer = context
342        .compactor_metrics
343        .compact_task_duration
344        .with_label_values(&[
345            &group_label,
346            &compact_task.input_ssts[0].level_idx.to_string(),
347        ])
348        .start_timer();
349
350    let multi_filter = build_multi_compaction_filter(&compact_task);
351    let mut task_status = TaskStatus::Success;
352    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
353
354    if let Err(e) =
355        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
356    {
357        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
358        task_status = TaskStatus::ExecuteFailed;
359        return (
360            compact_done(compact_task, context.clone(), vec![], task_status),
361            None,
362        );
363    }
364
365    let compact_task_statistics = statistics_compact_task(&compact_task);
366    // Number of splits (key ranges) is equal to number of compaction tasks
367    let parallelism = compact_task.splits.len();
368    assert_ne!(parallelism, 0, "splits cannot be empty");
369    let mut output_ssts = Vec::with_capacity(parallelism);
370    let mut compaction_futures = vec![];
371    let mut abort_handles = vec![];
372    let task_progress_guard =
373        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
374
375    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
376
377    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
378        &compact_task,
379        (context.storage_opts.block_size_kb as u64) * (1 << 10),
380        context
381            .storage_opts
382            .object_store_config
383            .s3
384            .recv_buffer_size
385            .unwrap_or(6 * 1024 * 1024) as u64,
386        capacity as u64,
387    ) * compact_task.splits.len() as u64;
388
389    tracing::info!(
390        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
391        compact_task.task_id,
392        compact_task_statistics,
393        compact_task.compression_algorithm,
394        parallelism,
395        task_memory_capacity_with_parallelism,
396        optimize_by_copy_block,
397        compact_task_to_string(&compact_task),
398    );
399
400    // If the task does not have enough memory, it should cancel the task and let the meta
401    // reschedule it, so that it does not occupy the compactor's resources.
402    let memory_detector = context
403        .memory_limiter
404        .try_require_memory(task_memory_capacity_with_parallelism);
405    if memory_detector.is_none() {
406        tracing::warn!(
407            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
408            compact_task.task_id,
409            task_memory_capacity_with_parallelism,
410            context.memory_limiter.get_memory_usage(),
411            context.memory_limiter.quota()
412        );
413        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
414        return (
415            compact_done(compact_task, context.clone(), output_ssts, task_status),
416            memory_detector,
417        );
418    }
419
420    context.compactor_metrics.compact_task_pending_num.inc();
421    context
422        .compactor_metrics
423        .compact_task_pending_parallelism
424        .add(parallelism as _);
425    let _release_metrics_guard =
426        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
427            context.compactor_metrics.compact_task_pending_num.dec();
428            context
429                .compactor_metrics
430                .compact_task_pending_parallelism
431                .sub(parallelism as _);
432        });
433
434    if optimize_by_copy_block {
435        let fast_compaction = {
436            let context = context.clone();
437            let task = compact_task.clone();
438            let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
439            let object_id_getter = object_id_getter.clone();
440            let task_progress = task_progress_guard.progress.clone();
441
442            async move {
443                match task.sstable_filter_type {
444                    PbSstableFilterType::SstableFilterXor8 => {
445                        fast_compactor_runner::CompactorRunner::<BlockedXor8FilterBuilder, _>::new(
446                            context,
447                            task,
448                            compaction_catalog_agent_ref,
449                            object_id_getter,
450                            task_progress,
451                            multi_filter,
452                        )
453                        .run()
454                        .await
455                    }
456                    PbSstableFilterType::SstableFilterXor16 => {
457                        fast_compactor_runner::CompactorRunner::<BlockedXor16FilterBuilder, _>::new(
458                            context,
459                            task,
460                            compaction_catalog_agent_ref,
461                            object_id_getter,
462                            task_progress,
463                            multi_filter,
464                        )
465                        .run()
466                        .await
467                    }
468                    filter_type => Err(HummockError::compaction_executor(format!(
469                        "fast compaction only supports blocked xor filters, got {:?}",
470                        filter_type
471                    ))),
472                }
473            }
474        };
475
476        tokio::select! {
477            _ = &mut shutdown_rx => {
478                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
479                task_status = TaskStatus::ManualCanceled;
480            },
481
482            ret = fast_compaction => {
483                match ret {
484                    Ok((ssts, statistics)) => {
485                        output_ssts.push((0, ssts, statistics));
486                    }
487                    Err(e) => {
488                        task_status = TaskStatus::ExecuteFailed;
489                        tracing::warn!(
490                            error = %e.as_report(),
491                            "Compaction task {} failed with error",
492                            compact_task.task_id,
493                        );
494                    }
495                }
496            }
497        }
498
499        // After a compaction is done, mutate the compaction task.
500        let (compact_task, table_stats, object_timestamps) =
501            compact_done(compact_task, context.clone(), output_ssts, task_status);
502        let cost_time = timer.stop_and_record() * 1000.0;
503        tracing::info!(
504            "Finished fast compaction task in {:?}ms: {}",
505            cost_time,
506            compact_task_to_string(&compact_task)
507        );
508        return (
509            (compact_task, table_stats, object_timestamps),
510            memory_detector,
511        );
512    }
513    for (split_index, _) in compact_task.splits.iter().enumerate() {
514        let filter = multi_filter.clone();
515        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
516        let compactor_runner = CompactorRunner::new(
517            split_index,
518            compactor_context.clone(),
519            compact_task.clone(),
520            object_id_getter.clone(),
521        );
522        let task_progress = task_progress_guard.progress.clone();
523        let runner = async move {
524            compactor_runner
525                .run(filter, compaction_catalog_agent_ref, task_progress)
526                .await
527        };
528        let traced = match context.await_tree_reg.as_ref() {
529            None => runner.right_future(),
530            Some(await_tree_reg) => await_tree_reg
531                .register(
532                    await_tree_key::CompactRunner {
533                        task_id: compact_task.task_id,
534                        split_index,
535                    },
536                    format!(
537                        "Compaction Task {} Split {} ",
538                        compact_task.task_id, split_index
539                    ),
540                )
541                .instrument(runner)
542                .left_future(),
543        };
544        let handle = tokio::spawn(traced);
545        abort_handles.push(handle.abort_handle());
546        compaction_futures.push(handle);
547    }
548
549    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
550    loop {
551        tokio::select! {
552            _ = &mut shutdown_rx => {
553                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
554                task_status = TaskStatus::ManualCanceled;
555                break;
556            }
557            future_result = buffered.next() => {
558                match future_result {
559                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
560                        output_ssts.push((split_index, ssts, compact_stat));
561                    }
562                    Some(Ok(Err(e))) => {
563                        task_status = TaskStatus::ExecuteFailed;
564                        tracing::warn!(
565                            error = %e.as_report(),
566                            "Compaction task {} failed with error",
567                            compact_task.task_id,
568                        );
569                        break;
570                    }
571                    Some(Err(e)) => {
572                        task_status = TaskStatus::JoinHandleFailed;
573                        tracing::warn!(
574                            error = %e.as_report(),
575                            "Compaction task {} failed with join handle error",
576                            compact_task.task_id,
577                        );
578                        break;
579                    }
580                    None => break,
581                }
582            }
583        }
584    }
585
586    if task_status != TaskStatus::Success {
587        for abort_handle in abort_handles {
588            abort_handle.abort();
589        }
590        output_ssts.clear();
591    }
592    // Sort by split/key range index.
593    if !output_ssts.is_empty() {
594        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
595    }
596
597    // After a compaction is done, mutate the compaction task.
598    let (compact_task, table_stats, object_timestamps) =
599        compact_done(compact_task, context.clone(), output_ssts, task_status);
600    let cost_time = timer.stop_and_record() * 1000.0;
601    tracing::info!(
602        "Finished compaction task in {:?}ms: {}",
603        cost_time,
604        compact_task_output_to_string(&compact_task)
605    );
606    (
607        (compact_task, table_stats, object_timestamps),
608        memory_detector,
609    )
610}
611
612pub(crate) async fn acquire_complete_catalog_agent(
613    compaction_catalog_manager_ref: &CompactionCatalogManagerRef,
614    read_table_ids: Vec<StateTableId>,
615) -> HummockResult<CompactionCatalogAgentRef> {
616    let expected_table_ids = read_table_ids.iter().copied().collect::<HashSet<_>>();
617    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
618        .acquire(read_table_ids)
619        .await?;
620    let acquired_table_ids = compaction_catalog_agent_ref
621        .table_ids()
622        .collect::<HashSet<_>>();
623
624    if acquired_table_ids != expected_table_ids {
625        let diff = expected_table_ids
626            .symmetric_difference(&acquired_table_ids)
627            .cloned()
628            .collect::<Vec<_>>();
629        return Err(HummockError::other(format!(
630            "some table ids are not acquired: {:?}",
631            diff
632        )));
633    }
634
635    Ok(compaction_catalog_agent_ref)
636}
637
638/// Handles a compaction task and reports its status to hummock manager.
639/// Always return `Ok` and let hummock manager handle errors.
640pub async fn compact(
641    compactor_context: CompactorContext,
642    compact_task: CompactTask,
643    shutdown_rx: Receiver<()>,
644    object_id_getter: Arc<dyn GetObjectId>,
645    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
646) -> (
647    (
648        CompactTask,
649        HashMap<TableId, TableStats>,
650        HashMap<HummockSstableObjectId, u64>,
651    ),
652    Option<MemoryTracker>,
653) {
654    let read_table_ids = compact_task.get_table_ids_from_input_ssts().collect_vec();
655    let compaction_catalog_agent_ref =
656        match acquire_complete_catalog_agent(&compaction_catalog_manager_ref, read_table_ids).await
657        {
658            Ok(compaction_catalog_agent_ref) => compaction_catalog_agent_ref,
659            Err(e) => {
660                tracing::warn!(
661                    error = %e.as_report(),
662                    "Failed to acquire complete compaction catalog agent"
663                );
664                return (
665                    compact_done(
666                        compact_task,
667                        compactor_context.clone(),
668                        vec![],
669                        TaskStatus::ExecuteFailed,
670                    ),
671                    None,
672                );
673            }
674        };
675
676    compact_with_agent(
677        compactor_context,
678        compact_task,
679        shutdown_rx,
680        object_id_getter,
681        compaction_catalog_agent_ref,
682    )
683    .await
684}
685
686/// Fills in the compact task and tries to report the task result to meta node.
687pub(crate) fn compact_done(
688    mut compact_task: CompactTask,
689    context: CompactorContext,
690    output_ssts: Vec<CompactOutput>,
691    task_status: TaskStatus,
692) -> (
693    CompactTask,
694    HashMap<TableId, TableStats>,
695    HashMap<HummockSstableObjectId, u64>,
696) {
697    let mut table_stats_map = TableStatsMap::default();
698    let mut object_timestamps = HashMap::default();
699    compact_task.task_status = task_status;
700    compact_task
701        .sorted_output_ssts
702        .reserve(compact_task.splits.len());
703    let mut compaction_write_bytes = 0;
704    for (
705        _,
706        ssts,
707        CompactionStatistics {
708            delta_drop_stat, ..
709        },
710    ) in output_ssts
711    {
712        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
713        for sst_info in ssts {
714            compaction_write_bytes += sst_info.file_size();
715            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
716            compact_task.sorted_output_ssts.push(sst_info.sst_info);
717        }
718    }
719    assert_output_table_ids_subset_of_read_input(&compact_task);
720
721    let group_label = compact_task.compaction_group_id.to_string();
722    let level_label = compact_task.target_level.to_string();
723    context
724        .compactor_metrics
725        .compact_write_bytes
726        .with_label_values(&[&group_label, &level_label])
727        .inc_by(compaction_write_bytes);
728    context
729        .compactor_metrics
730        .compact_write_sstn
731        .with_label_values(&[&group_label, &level_label])
732        .inc_by(compact_task.sorted_output_ssts.len() as u64);
733
734    (compact_task, table_stats_map, object_timestamps)
735}
736
737/// Compactor output must only contain table ids that are readable in the normalized task input.
738fn assert_output_table_ids_subset_of_read_input(compact_task: &CompactTask) {
739    let read_table_ids = compact_task
740        .get_table_ids_from_input_ssts()
741        .collect::<HashSet<_>>();
742
743    for output_sst in &compact_task.sorted_output_ssts {
744        assert!(
745            output_sst
746                .table_ids
747                .iter()
748                .all(|table_id| read_table_ids.contains(table_id)),
749            "compaction output contains table ids outside task read input: task_id={}, output_sst_id={}, read_table_ids={:?}, output_table_ids={:?}",
750            compact_task.task_id,
751            output_sst.sst_id,
752            read_table_ids,
753            output_sst.table_ids,
754        );
755    }
756}
757
758pub async fn compact_and_build_sst<F>(
759    sst_builder: &mut CapacitySplitTableBuilder<F>,
760    task_config: &TaskConfig,
761    compactor_metrics: Arc<CompactorMetrics>,
762    mut iter: impl HummockIterator<Direction = Forward>,
763    mut compaction_filter: impl CompactionFilter,
764) -> HummockResult<CompactionStatistics>
765where
766    F: TableBuilderFactory,
767{
768    if !task_config.key_range.left.is_empty() {
769        let full_key = FullKey::decode(&task_config.key_range.left);
770        iter.seek(full_key)
771            .instrument_await("iter_seek".verbose())
772            .await?;
773    } else {
774        iter.rewind().instrument_await("rewind".verbose()).await?;
775    };
776
777    let end_key = if task_config.key_range.right.is_empty() {
778        FullKey::default()
779    } else {
780        FullKey::decode(&task_config.key_range.right).to_vec()
781    };
782    let max_key = end_key.to_ref();
783
784    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
785    let mut local_stats = StoreLocalStatistic::default();
786
787    // Keep table stats changes due to dropping KV.
788    let mut table_stats_drop = TableStatsMap::default();
789    let mut last_table_stats = TableStats::default();
790    let mut last_table_id = None;
791    let mut compaction_statistics = CompactionStatistics::default();
792    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
793    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
794    let schemas: HashMap<TableId, HashSet<i32>> = task_config
795        .table_schemas
796        .iter()
797        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
798        .collect();
799    while iter.is_valid() {
800        let iter_key = iter.key();
801        compaction_statistics.iter_total_key_counts += 1;
802
803        let is_new_user_key = full_key_tracker.observe(iter.key());
804        let mut drop = false;
805
806        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
807        let value = iter.value();
808        let ValueMeta {
809            object_id,
810            block_id,
811        } = iter.value_meta();
812        if is_new_user_key {
813            if !max_key.is_empty() && iter_key >= max_key {
814                break;
815            }
816            if value.is_delete() {
817                local_stats.skip_delete_key_count += 1;
818            }
819        } else {
820            local_stats.skip_multi_version_key_count += 1;
821        }
822
823        if last_table_id != Some(iter_key.user_key.table_id) {
824            if let Some(last_table_id) = last_table_id.take() {
825                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
826            }
827            last_table_id = Some(iter_key.user_key.table_id);
828        }
829
830        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
831        // In our design, frontend avoid to access keys which had be deleted, so we don't
832        // need to consider the epoch when the compaction_filter match (it
833        // means that mv had drop)
834        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
835        // Do not assume that "the epoch of keys behind must be smaller than the current key."
836        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
837            || (!task_config.retain_multiple_version && !is_new_user_key)
838        {
839            drop = true;
840        }
841
842        if !drop && compaction_filter.should_delete(iter_key) {
843            drop = true;
844        }
845
846        if drop {
847            compaction_statistics.iter_drop_key_counts += 1;
848
849            last_table_stats.total_key_count -= 1;
850            last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
851            last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
852            iter.next()
853                .instrument_await("iter_next_in_drop".verbose())
854                .await?;
855            continue;
856        }
857
858        // May drop stale columns
859        let check_table_id = iter_key.user_key.table_id;
860        let mut is_value_rewritten = false;
861        if let HummockValue::Put(v) = value
862            && let Some(object_id) = object_id
863            && let Some(block_id) = block_id
864            && !skip_schema_check
865                .get(&object_id)
866                .map(|prev_block_id| {
867                    assert!(*prev_block_id <= block_id);
868                    *prev_block_id == block_id
869                })
870                .unwrap_or(false)
871            && let Some(schema) = schemas.get(&check_table_id)
872        {
873            let value_size = v.len();
874            match try_drop_invalid_columns(v, schema) {
875                None => {
876                    if !task_config.disable_drop_column_optimization {
877                        // Under the assumption that all values in the same (object, block) group should share the same schema,
878                        // if one value drops no columns during a compaction, no need to check other values in the same group.
879                        skip_schema_check.insert(object_id, block_id);
880                    }
881                }
882                Some(new_value) => {
883                    is_value_rewritten = true;
884                    let new_put = HummockValue::put(new_value.as_slice());
885                    sst_builder
886                        .add_full_key(iter_key, new_put, is_new_user_key)
887                        .instrument_await("add_rewritten_full_key".verbose())
888                        .await?;
889                    let value_size_change = value_size as i64 - new_value.len() as i64;
890                    assert!(value_size_change >= 0);
891                    last_table_stats.total_value_size -= value_size_change;
892                }
893            }
894        }
895
896        if !is_value_rewritten {
897            // Don't allow two SSTs to share same user key
898            sst_builder
899                .add_full_key(iter_key, value, is_new_user_key)
900                .instrument_await("add_full_key".verbose())
901                .await?;
902        }
903
904        iter.next().instrument_await("iter_next".verbose()).await?;
905    }
906
907    if let Some(last_table_id) = last_table_id.take() {
908        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
909    }
910    iter.collect_local_statistic(&mut local_stats);
911    add_table_stats_map(
912        &mut table_stats_drop,
913        &local_stats.skipped_by_watermark_table_stats,
914    );
915    local_stats.report_compactor(compactor_metrics.as_ref());
916    compaction_statistics.delta_drop_stat = table_stats_drop;
917
918    Ok(compaction_statistics)
919}
920
921#[cfg(test)]
922pub mod tests {
923    use risingwave_hummock_sdk::can_concat;
924
925    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
926    use crate::hummock::iterator::test_utils::mock_sstable_store;
927    use crate::hummock::test_utils::{
928        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
929    };
930    use crate::hummock::value::HummockValue;
931
932    #[tokio::test]
933    async fn test_partition_overlapping_level() {
934        const TEST_KEYS_COUNT: usize = 10;
935        let sstable_store = mock_sstable_store().await;
936        let mut table_infos = vec![];
937        for object_id in 0..10 {
938            let start_index = object_id * TEST_KEYS_COUNT;
939            let end_index = start_index + 2 * TEST_KEYS_COUNT;
940            let table_info = gen_test_sstable_info(
941                default_builder_opt_for_test(),
942                object_id as u64,
943                (start_index..end_index)
944                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
945                sstable_store.clone(),
946            )
947            .await;
948            table_infos.push(table_info);
949        }
950        let table_infos = partition_overlapping_sstable_infos(table_infos);
951        assert_eq!(table_infos.len(), 2);
952        for ssts in table_infos {
953            assert!(can_concat(&ssts));
954        }
955    }
956}