risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66    SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        let use_block_based_filter = task.should_use_block_based_filter();
93
94        let key_range = KeyRange {
95            left: task.splits[split_index].left.clone(),
96            right: task.splits[split_index].right.clone(),
97            right_exclusive: true,
98        };
99
100        let compactor = Compactor::new(
101            context.clone(),
102            options,
103            TaskConfig {
104                key_range: key_range.clone(),
105                cache_policy: CachePolicy::NotFill,
106                gc_delete_keys: task.gc_delete_keys,
107                retain_multiple_version: false,
108                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
109                task_type: task.task_type,
110                use_block_based_filter,
111                table_vnode_partition: task.table_vnode_partition.clone(),
112                table_schemas: task
113                    .table_schemas
114                    .iter()
115                    .map(|(k, v)| (*k, v.clone()))
116                    .collect(),
117                disable_drop_column_optimization: false,
118            },
119            object_id_getter,
120        );
121
122        Self {
123            compactor,
124            compact_task: task,
125            sstable_store: context.sstable_store,
126            key_range,
127            split_index,
128        }
129    }
130
131    pub async fn run(
132        &self,
133        compaction_filter: impl CompactionFilter,
134        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135        task_progress: Arc<TaskProgress>,
136    ) -> HummockResult<CompactOutput> {
137        let iter =
138            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139        let (ssts, compaction_stat) = self
140            .compactor
141            .compact_key_range(
142                iter,
143                compaction_filter,
144                compaction_catalog_agent_ref,
145                Some(task_progress),
146                Some(self.compact_task.task_id),
147                Some(self.split_index),
148            )
149            .await?;
150        Ok((self.split_index, ssts, compaction_stat))
151    }
152
153    /// Build the merge iterator based on the given input ssts.
154    fn build_sst_iter(
155        &self,
156        task_progress: Arc<TaskProgress>,
157        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159        let compactor_iter_max_io_retry_times = self
160            .compactor
161            .context
162            .storage_opts
163            .compactor_iter_max_io_retry_times;
164        let mut table_iters = Vec::new();
165        for level in &self.compact_task.input_ssts {
166            if level.table_infos.is_empty() {
167                continue;
168            }
169
170            let tables = level
171                .table_infos
172                .iter()
173                .filter(|table_info| {
174                    let table_ids = &table_info.table_ids;
175                    let exist_table = table_ids
176                        .iter()
177                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
178
179                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
180                })
181                .cloned()
182                .collect_vec();
183            // Do not need to filter the table because manager has done it.
184            if level.level_type == LevelType::Nonoverlapping {
185                debug_assert!(can_concat(&level.table_infos));
186                table_iters.push(ConcatSstableIterator::new(
187                    self.compact_task.existing_table_ids.clone(),
188                    tables,
189                    self.compactor.task_config.key_range.clone(),
190                    self.sstable_store.clone(),
191                    task_progress.clone(),
192                    compactor_iter_max_io_retry_times,
193                ));
194            } else if tables.len()
195                > self
196                    .compactor
197                    .context
198                    .storage_opts
199                    .compactor_max_overlap_sst_count
200            {
201                let sst_groups = partition_overlapping_sstable_infos(tables);
202                tracing::warn!(
203                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
204                    level.table_infos.len(),
205                    sst_groups.len()
206                );
207                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
208                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
209                    assert!(
210                        full_key_can_concat(&table_infos),
211                        "sst_group idx {:?} table_infos: {:?}",
212                        idx,
213                        table_infos
214                    );
215                    table_iters.push(ConcatSstableIterator::new(
216                        self.compact_task.existing_table_ids.clone(),
217                        table_infos,
218                        self.compactor.task_config.key_range.clone(),
219                        self.sstable_store.clone(),
220                        task_progress.clone(),
221                        compactor_iter_max_io_retry_times,
222                    ));
223                }
224            } else {
225                for table_info in tables {
226                    table_iters.push(ConcatSstableIterator::new(
227                        self.compact_task.existing_table_ids.clone(),
228                        vec![table_info],
229                        self.compactor.task_config.key_range.clone(),
230                        self.sstable_store.clone(),
231                        task_progress.clone(),
232                        compactor_iter_max_io_retry_times,
233                    ));
234                }
235            }
236        }
237
238        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
239        // in https://github.com/risingwavelabs/risingwave/issues/13148
240        let combine_iter = {
241            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
242                MonitoredCompactorIterator::new(
243                    MergeIterator::for_compactor(table_iters),
244                    task_progress,
245                ),
246                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
247                    self.compact_task.pk_prefix_table_watermarks.clone(),
248                ),
249            );
250
251            NonPkPrefixSkipWatermarkIterator::new(
252                skip_watermark_iter,
253                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
254                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
255                    compaction_catalog_agent_ref,
256                ),
257            )
258        };
259
260        Ok(combine_iter)
261    }
262}
263
264pub fn partition_overlapping_sstable_infos(
265    mut origin_infos: Vec<SstableInfo>,
266) -> Vec<Vec<SstableInfo>> {
267    pub struct SstableGroup {
268        ssts: Vec<SstableInfo>,
269        max_right_bound: Bytes,
270    }
271
272    impl PartialEq for SstableGroup {
273        fn eq(&self, other: &SstableGroup) -> bool {
274            self.max_right_bound == other.max_right_bound
275        }
276    }
277    impl PartialOrd for SstableGroup {
278        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
279            Some(self.cmp(other))
280        }
281    }
282    impl Eq for SstableGroup {}
283    impl Ord for SstableGroup {
284        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
285            // Pick group with the smallest right bound for every new sstable.
286            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
287        }
288    }
289    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
290    origin_infos.sort_by(|a, b| {
291        let x = &a.key_range;
292        let y = &b.key_range;
293        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
294    });
295    for sst in origin_infos {
296        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
297        if let Some(mut prev_group) = groups.peek_mut()
298            && KeyComparator::encoded_full_key_less_than(
299                &prev_group.max_right_bound,
300                &sst.key_range.left,
301            )
302        {
303            prev_group.max_right_bound.clone_from(&sst.key_range.right);
304            prev_group.ssts.push(sst);
305            continue;
306        }
307        groups.push(SstableGroup {
308            max_right_bound: sst.key_range.right.clone(),
309            ssts: vec![sst],
310        });
311    }
312    assert!(!groups.is_empty());
313    groups.into_iter().map(|group| group.ssts).collect_vec()
314}
315
316/// Handles a compaction task and reports its status to hummock manager.
317/// Always return `Ok` and let hummock manager handle errors.
318pub async fn compact_with_agent(
319    compactor_context: CompactorContext,
320    mut compact_task: CompactTask,
321    mut shutdown_rx: Receiver<()>,
322    object_id_getter: Arc<dyn GetObjectId>,
323    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
324) -> (
325    (
326        CompactTask,
327        HashMap<TableId, TableStats>,
328        HashMap<HummockSstableObjectId, u64>,
329    ),
330    Option<MemoryTracker>,
331) {
332    let context = compactor_context.clone();
333    let group_label = compact_task.compaction_group_id.to_string();
334    metrics_report_for_task(&compact_task, &context);
335
336    let timer = context
337        .compactor_metrics
338        .compact_task_duration
339        .with_label_values(&[
340            &group_label,
341            &compact_task.input_ssts[0].level_idx.to_string(),
342        ])
343        .start_timer();
344
345    let multi_filter = build_multi_compaction_filter(&compact_task);
346    let mut task_status = TaskStatus::Success;
347    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
348
349    if let Err(e) =
350        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
351    {
352        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
353        task_status = TaskStatus::ExecuteFailed;
354        return (
355            compact_done(compact_task, context.clone(), vec![], task_status),
356            None,
357        );
358    }
359
360    let compact_task_statistics = statistics_compact_task(&compact_task);
361    // Number of splits (key ranges) is equal to number of compaction tasks
362    let parallelism = compact_task.splits.len();
363    assert_ne!(parallelism, 0, "splits cannot be empty");
364    let mut output_ssts = Vec::with_capacity(parallelism);
365    let mut compaction_futures = vec![];
366    let mut abort_handles = vec![];
367    let task_progress_guard =
368        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
369
370    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
371
372    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
373        &compact_task,
374        (context.storage_opts.block_size_kb as u64) * (1 << 10),
375        context
376            .storage_opts
377            .object_store_config
378            .s3
379            .recv_buffer_size
380            .unwrap_or(6 * 1024 * 1024) as u64,
381        capacity as u64,
382    ) * compact_task.splits.len() as u64;
383
384    tracing::info!(
385        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
386        compact_task.task_id,
387        compact_task_statistics,
388        compact_task.compression_algorithm,
389        parallelism,
390        task_memory_capacity_with_parallelism,
391        optimize_by_copy_block,
392        compact_task_to_string(&compact_task),
393    );
394
395    // If the task does not have enough memory, it should cancel the task and let the meta
396    // reschedule it, so that it does not occupy the compactor's resources.
397    let memory_detector = context
398        .memory_limiter
399        .try_require_memory(task_memory_capacity_with_parallelism);
400    if memory_detector.is_none() {
401        tracing::warn!(
402            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
403            compact_task.task_id,
404            task_memory_capacity_with_parallelism,
405            context.memory_limiter.get_memory_usage(),
406            context.memory_limiter.quota()
407        );
408        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
409        return (
410            compact_done(compact_task, context.clone(), output_ssts, task_status),
411            memory_detector,
412        );
413    }
414
415    context.compactor_metrics.compact_task_pending_num.inc();
416    context
417        .compactor_metrics
418        .compact_task_pending_parallelism
419        .add(parallelism as _);
420    let _release_metrics_guard =
421        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
422            context.compactor_metrics.compact_task_pending_num.dec();
423            context
424                .compactor_metrics
425                .compact_task_pending_parallelism
426                .sub(parallelism as _);
427        });
428
429    if optimize_by_copy_block {
430        let runner = fast_compactor_runner::CompactorRunner::new(
431            context.clone(),
432            compact_task.clone(),
433            compaction_catalog_agent_ref.clone(),
434            object_id_getter.clone(),
435            task_progress_guard.progress.clone(),
436            multi_filter,
437        );
438
439        tokio::select! {
440            _ = &mut shutdown_rx => {
441                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
442                task_status = TaskStatus::ManualCanceled;
443            },
444
445            ret = runner.run() => {
446                match ret {
447                    Ok((ssts, statistics)) => {
448                        output_ssts.push((0, ssts, statistics));
449                    }
450                    Err(e) => {
451                        task_status = TaskStatus::ExecuteFailed;
452                        tracing::warn!(
453                            error = %e.as_report(),
454                            "Compaction task {} failed with error",
455                            compact_task.task_id,
456                        );
457                    }
458                }
459            }
460        }
461
462        // After a compaction is done, mutate the compaction task.
463        let (compact_task, table_stats, object_timestamps) =
464            compact_done(compact_task, context.clone(), output_ssts, task_status);
465        let cost_time = timer.stop_and_record() * 1000.0;
466        tracing::info!(
467            "Finished fast compaction task in {:?}ms: {}",
468            cost_time,
469            compact_task_to_string(&compact_task)
470        );
471        return (
472            (compact_task, table_stats, object_timestamps),
473            memory_detector,
474        );
475    }
476    for (split_index, _) in compact_task.splits.iter().enumerate() {
477        let filter = multi_filter.clone();
478        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
479        let compactor_runner = CompactorRunner::new(
480            split_index,
481            compactor_context.clone(),
482            compact_task.clone(),
483            object_id_getter.clone(),
484        );
485        let task_progress = task_progress_guard.progress.clone();
486        let runner = async move {
487            compactor_runner
488                .run(filter, compaction_catalog_agent_ref, task_progress)
489                .await
490        };
491        let traced = match context.await_tree_reg.as_ref() {
492            None => runner.right_future(),
493            Some(await_tree_reg) => await_tree_reg
494                .register(
495                    await_tree_key::CompactRunner {
496                        task_id: compact_task.task_id,
497                        split_index,
498                    },
499                    format!(
500                        "Compaction Task {} Split {} ",
501                        compact_task.task_id, split_index
502                    ),
503                )
504                .instrument(runner)
505                .left_future(),
506        };
507        let handle = tokio::spawn(traced);
508        abort_handles.push(handle.abort_handle());
509        compaction_futures.push(handle);
510    }
511
512    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
513    loop {
514        tokio::select! {
515            _ = &mut shutdown_rx => {
516                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
517                task_status = TaskStatus::ManualCanceled;
518                break;
519            }
520            future_result = buffered.next() => {
521                match future_result {
522                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
523                        output_ssts.push((split_index, ssts, compact_stat));
524                    }
525                    Some(Ok(Err(e))) => {
526                        task_status = TaskStatus::ExecuteFailed;
527                        tracing::warn!(
528                            error = %e.as_report(),
529                            "Compaction task {} failed with error",
530                            compact_task.task_id,
531                        );
532                        break;
533                    }
534                    Some(Err(e)) => {
535                        task_status = TaskStatus::JoinHandleFailed;
536                        tracing::warn!(
537                            error = %e.as_report(),
538                            "Compaction task {} failed with join handle error",
539                            compact_task.task_id,
540                        );
541                        break;
542                    }
543                    None => break,
544                }
545            }
546        }
547    }
548
549    if task_status != TaskStatus::Success {
550        for abort_handle in abort_handles {
551            abort_handle.abort();
552        }
553        output_ssts.clear();
554    }
555    // Sort by split/key range index.
556    if !output_ssts.is_empty() {
557        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
558    }
559
560    // After a compaction is done, mutate the compaction task.
561    let (compact_task, table_stats, object_timestamps) =
562        compact_done(compact_task, context.clone(), output_ssts, task_status);
563    let cost_time = timer.stop_and_record() * 1000.0;
564    tracing::info!(
565        "Finished compaction task in {:?}ms: {}",
566        cost_time,
567        compact_task_output_to_string(&compact_task)
568    );
569    (
570        (compact_task, table_stats, object_timestamps),
571        memory_detector,
572    )
573}
574
575/// Handles a compaction task and reports its status to hummock manager.
576/// Always return `Ok` and let hummock manager handle errors.
577pub async fn compact(
578    compactor_context: CompactorContext,
579    mut compact_task: CompactTask,
580    shutdown_rx: Receiver<()>,
581    object_id_getter: Arc<dyn GetObjectId>,
582    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
583) -> (
584    (
585        CompactTask,
586        HashMap<TableId, TableStats>,
587        HashMap<HummockSstableObjectId, u64>,
588    ),
589    Option<MemoryTracker>,
590) {
591    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
592    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
593        .acquire(table_ids_to_be_compacted.clone())
594        .await
595    {
596        Ok(compaction_catalog_agent_ref) => {
597            let acquire_table_ids: HashSet<StateTableId> =
598                compaction_catalog_agent_ref.table_ids().collect();
599            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
600                let diff = table_ids_to_be_compacted
601                    .into_iter()
602                    .collect::<HashSet<_>>()
603                    .symmetric_difference(&acquire_table_ids)
604                    .cloned()
605                    .collect::<Vec<_>>();
606                tracing::warn!(
607                    dif= ?diff,
608                    "Some table ids are not acquired."
609                );
610                return (
611                    compact_done(
612                        compact_task,
613                        compactor_context.clone(),
614                        vec![],
615                        TaskStatus::ExecuteFailed,
616                    ),
617                    None,
618                );
619            }
620
621            compaction_catalog_agent_ref
622        }
623        Err(e) => {
624            tracing::warn!(
625                error = %e.as_report(),
626                "Failed to acquire compaction catalog agent"
627            );
628            return (
629                compact_done(
630                    compact_task,
631                    compactor_context.clone(),
632                    vec![],
633                    TaskStatus::ExecuteFailed,
634                ),
635                None,
636            );
637        }
638    };
639
640    // rewrite compact_task watermarks
641    {
642        compact_task
643            .pk_prefix_table_watermarks
644            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
645
646        compact_task
647            .non_pk_prefix_table_watermarks
648            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
649    }
650
651    compact_with_agent(
652        compactor_context,
653        compact_task,
654        shutdown_rx,
655        object_id_getter,
656        compaction_catalog_agent_ref,
657    )
658    .await
659}
660
661/// Fills in the compact task and tries to report the task result to meta node.
662pub(crate) fn compact_done(
663    mut compact_task: CompactTask,
664    context: CompactorContext,
665    output_ssts: Vec<CompactOutput>,
666    task_status: TaskStatus,
667) -> (
668    CompactTask,
669    HashMap<TableId, TableStats>,
670    HashMap<HummockSstableObjectId, u64>,
671) {
672    let mut table_stats_map = TableStatsMap::default();
673    let mut object_timestamps = HashMap::default();
674    compact_task.task_status = task_status;
675    compact_task
676        .sorted_output_ssts
677        .reserve(compact_task.splits.len());
678    let mut compaction_write_bytes = 0;
679    for (
680        _,
681        ssts,
682        CompactionStatistics {
683            delta_drop_stat, ..
684        },
685    ) in output_ssts
686    {
687        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
688        for sst_info in ssts {
689            compaction_write_bytes += sst_info.file_size();
690            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
691            compact_task.sorted_output_ssts.push(sst_info.sst_info);
692        }
693    }
694
695    let group_label = compact_task.compaction_group_id.to_string();
696    let level_label = compact_task.target_level.to_string();
697    context
698        .compactor_metrics
699        .compact_write_bytes
700        .with_label_values(&[&group_label, &level_label])
701        .inc_by(compaction_write_bytes);
702    context
703        .compactor_metrics
704        .compact_write_sstn
705        .with_label_values(&[&group_label, &level_label])
706        .inc_by(compact_task.sorted_output_ssts.len() as u64);
707
708    (compact_task, table_stats_map, object_timestamps)
709}
710
711pub async fn compact_and_build_sst<F>(
712    sst_builder: &mut CapacitySplitTableBuilder<F>,
713    task_config: &TaskConfig,
714    compactor_metrics: Arc<CompactorMetrics>,
715    mut iter: impl HummockIterator<Direction = Forward>,
716    mut compaction_filter: impl CompactionFilter,
717) -> HummockResult<CompactionStatistics>
718where
719    F: TableBuilderFactory,
720{
721    if !task_config.key_range.left.is_empty() {
722        let full_key = FullKey::decode(&task_config.key_range.left);
723        iter.seek(full_key)
724            .instrument_await("iter_seek".verbose())
725            .await?;
726    } else {
727        iter.rewind().instrument_await("rewind".verbose()).await?;
728    };
729
730    let end_key = if task_config.key_range.right.is_empty() {
731        FullKey::default()
732    } else {
733        FullKey::decode(&task_config.key_range.right).to_vec()
734    };
735    let max_key = end_key.to_ref();
736
737    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
738    let mut local_stats = StoreLocalStatistic::default();
739
740    // Keep table stats changes due to dropping KV.
741    let mut table_stats_drop = TableStatsMap::default();
742    let mut last_table_stats = TableStats::default();
743    let mut last_table_id = None;
744    let mut compaction_statistics = CompactionStatistics::default();
745    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
746    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
747    let schemas: HashMap<TableId, HashSet<i32>> = task_config
748        .table_schemas
749        .iter()
750        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
751        .collect();
752    while iter.is_valid() {
753        let iter_key = iter.key();
754        compaction_statistics.iter_total_key_counts += 1;
755
756        let is_new_user_key = full_key_tracker.observe(iter.key());
757        let mut drop = false;
758
759        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
760        let value = iter.value();
761        let ValueMeta {
762            object_id,
763            block_id,
764        } = iter.value_meta();
765        if is_new_user_key {
766            if !max_key.is_empty() && iter_key >= max_key {
767                break;
768            }
769            if value.is_delete() {
770                local_stats.skip_delete_key_count += 1;
771            }
772        } else {
773            local_stats.skip_multi_version_key_count += 1;
774        }
775
776        if last_table_id != Some(iter_key.user_key.table_id) {
777            if let Some(last_table_id) = last_table_id.take() {
778                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
779            }
780            last_table_id = Some(iter_key.user_key.table_id);
781        }
782
783        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
784        // In our design, frontend avoid to access keys which had be deleted, so we don't
785        // need to consider the epoch when the compaction_filter match (it
786        // means that mv had drop)
787        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
788        // Do not assume that "the epoch of keys behind must be smaller than the current key."
789        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
790            || (!task_config.retain_multiple_version && !is_new_user_key)
791        {
792            drop = true;
793        }
794
795        if !drop && compaction_filter.should_delete(iter_key) {
796            drop = true;
797        }
798
799        if drop {
800            compaction_statistics.iter_drop_key_counts += 1;
801
802            let should_count = match task_config.stats_target_table_ids.as_ref() {
803                Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
804                None => true,
805            };
806            if should_count {
807                last_table_stats.total_key_count -= 1;
808                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
809                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
810            }
811            iter.next()
812                .instrument_await("iter_next_in_drop".verbose())
813                .await?;
814            continue;
815        }
816
817        // May drop stale columns
818        let check_table_id = iter_key.user_key.table_id;
819        let mut is_value_rewritten = false;
820        if let HummockValue::Put(v) = value
821            && let Some(object_id) = object_id
822            && let Some(block_id) = block_id
823            && !skip_schema_check
824                .get(&object_id)
825                .map(|prev_block_id| {
826                    assert!(*prev_block_id <= block_id);
827                    *prev_block_id == block_id
828                })
829                .unwrap_or(false)
830            && let Some(schema) = schemas.get(&check_table_id)
831        {
832            let value_size = v.len();
833            match try_drop_invalid_columns(v, schema) {
834                None => {
835                    if !task_config.disable_drop_column_optimization {
836                        // Under the assumption that all values in the same (object, block) group should share the same schema,
837                        // if one value drops no columns during a compaction, no need to check other values in the same group.
838                        skip_schema_check.insert(object_id, block_id);
839                    }
840                }
841                Some(new_value) => {
842                    is_value_rewritten = true;
843                    let new_put = HummockValue::put(new_value.as_slice());
844                    sst_builder
845                        .add_full_key(iter_key, new_put, is_new_user_key)
846                        .instrument_await("add_rewritten_full_key".verbose())
847                        .await?;
848                    let value_size_change = value_size as i64 - new_value.len() as i64;
849                    assert!(value_size_change >= 0);
850                    last_table_stats.total_value_size -= value_size_change;
851                }
852            }
853        }
854
855        if !is_value_rewritten {
856            // Don't allow two SSTs to share same user key
857            sst_builder
858                .add_full_key(iter_key, value, is_new_user_key)
859                .instrument_await("add_full_key".verbose())
860                .await?;
861        }
862
863        iter.next().instrument_await("iter_next".verbose()).await?;
864    }
865
866    if let Some(last_table_id) = last_table_id.take() {
867        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
868    }
869    iter.collect_local_statistic(&mut local_stats);
870    add_table_stats_map(
871        &mut table_stats_drop,
872        &local_stats.skipped_by_watermark_table_stats,
873    );
874    local_stats.report_compactor(compactor_metrics.as_ref());
875    compaction_statistics.delta_drop_stat = table_stats_drop;
876
877    Ok(compaction_statistics)
878}
879
880#[cfg(test)]
881pub mod tests {
882    use risingwave_hummock_sdk::can_concat;
883
884    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
885    use crate::hummock::iterator::test_utils::mock_sstable_store;
886    use crate::hummock::test_utils::{
887        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
888    };
889    use crate::hummock::value::HummockValue;
890
891    #[tokio::test]
892    async fn test_partition_overlapping_level() {
893        const TEST_KEYS_COUNT: usize = 10;
894        let sstable_store = mock_sstable_store().await;
895        let mut table_infos = vec![];
896        for object_id in 0..10 {
897            let start_index = object_id * TEST_KEYS_COUNT;
898            let end_index = start_index + 2 * TEST_KEYS_COUNT;
899            let table_info = gen_test_sstable_info(
900                default_builder_opt_for_test(),
901                object_id as u64,
902                (start_index..end_index)
903                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
904                sstable_store.clone(),
905            )
906            .await;
907            table_infos.push(table_info);
908        }
909        let table_infos = partition_overlapping_sstable_infos(table_infos);
910        assert_eq!(table_infos.len(), 2);
911        for ssts in table_infos {
912            assert!(can_concat(&ssts));
913        }
914    }
915}