Skip to main content

risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use itertools::Itertools;
18use risingwave_common::catalog::TableId;
19use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
20use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
21use risingwave_pb::hummock::{
22    CompactTaskAssignment as PbCompactTaskAssignment, LevelType, PbCompactTask, PbKeyRange,
23    PbSstableFilterLayout, PbSstableFilterType, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockContextId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37    /// SSTs selected by meta, which will be removed from LSM after compaction succeeds.
38    ///
39    /// Each SST's `table_ids` comes from version metadata, where truncate/drop table deltas prune
40    /// table ids before later compaction tasks are picked.
41    pub input_ssts: Vec<InputLevel>,
42    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
43    /// corresponding to that in `splits`, respectively
44    pub splits: Vec<KeyRange>,
45    /// compaction output, which will be added to `target_level` of LSM after compaction
46    pub sorted_output_ssts: Vec<SstableInfo>,
47    /// task id assigned by hummock storage service
48    pub task_id: u64,
49    /// compaction output will be added to `target_level` of LSM after compaction
50    pub target_level: u32,
51    pub gc_delete_keys: bool,
52    /// Lbase in LSM
53    pub base_level: u32,
54    pub task_status: PbTaskStatus,
55    /// compaction group the task belongs to.
56    pub compaction_group_id: CompactionGroupId,
57    /// compaction group id when the compaction task is created
58    pub compaction_group_version_id: u64,
59    /// Table ids that still exist in the version when the compaction task is picked.
60    ///
61    /// This may be broader than the table ids touched by this task's input SSTs. Task-local table
62    /// ids should be read from `input_ssts[*].table_ids`.
63    pub existing_table_ids: Vec<TableId>,
64    pub compression_algorithm: u32,
65    pub target_file_size: u64,
66    pub compaction_filter_mask: u32,
67    pub table_options: BTreeMap<TableId, PbTableOption>,
68    pub current_epoch_time: u64,
69    pub target_sub_level_id: u64,
70    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
71    pub task_type: PbTaskType,
72    /// Deprecated. use `table_vnode_partition` instead;
73    pub split_by_state_table: bool,
74    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
75    /// Deprecated. use `table_vnode_partition` instead;
76    pub split_weight_by_vnode: u32,
77    pub table_vnode_partition: BTreeMap<TableId, u32>,
78    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
79    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
80    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
81
82    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
83    pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
84
85    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
86
87    pub max_sub_compaction: u32,
88
89    pub blocked_xor_filter_kv_count_threshold: Option<u64>,
90
91    pub max_vnode_key_range_bytes: Option<u64>,
92
93    pub sstable_filter_type: PbSstableFilterType,
94
95    pub sstable_filter_layout: PbSstableFilterLayout,
96}
97
98#[derive(Clone, PartialEq, Default, Debug)]
99pub struct CompactTaskAssignment {
100    pub compact_task: CompactTask,
101    pub context_id: HummockContextId,
102}
103
104impl CompactTask {
105    pub fn is_trivial_move_task(&self) -> bool {
106        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
107            return false;
108        }
109
110        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
111        {
112            return false;
113        }
114
115        // it may be a manual compaction task
116        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
117            && self.input_ssts[0].level_idx > 0
118        {
119            return false;
120        }
121
122        if self.input_ssts[1].level_idx == self.target_level
123            && self.input_ssts[1].table_infos.is_empty()
124        {
125            return true;
126        }
127
128        false
129    }
130
131    pub fn task_label(&self) -> &'static str {
132        if self.is_trivial_reclaim() {
133            return "trivial-space-reclaim";
134        }
135
136        if self.is_trivial_move_task() {
137            return "trivial-move";
138        }
139
140        "normal"
141    }
142
143    pub fn is_trivial_reclaim(&self) -> bool {
144        // Currently all VnodeWatermark tasks are trivial reclaim.
145        if self.task_type == TaskType::VnodeWatermark {
146            return true;
147        }
148        self.input_ssts
149            .iter()
150            .flat_map(|level| level.table_infos.iter())
151            .all(|sst| sst.table_ids.is_empty())
152    }
153}
154
155impl CompactTask {
156    // The compact task may need to reclaim key with TTL
157    pub fn contains_ttl(&self) -> bool {
158        self.table_options
159            .iter()
160            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
161    }
162
163    // The compact task may need to reclaim key with range tombstone
164    pub fn contains_range_tombstone(&self) -> bool {
165        self.read_input_ssts()
166            .any(|sst| sst.range_tombstone_count > 0)
167    }
168
169    // The compact task may need to reclaim key with split sst
170    pub fn contains_split_sst(&self) -> bool {
171        self.read_input_ssts()
172            .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
173    }
174
175    /// Returns sorted and deduplicated table ids from this task's normalized input SST metadata.
176    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
177        self.input_ssts
178            .iter()
179            .flat_map(|level| level.table_infos.iter())
180            .flat_map(|sst| sst.table_ids.iter().copied())
181            .sorted()
182            .dedup()
183    }
184
185    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
186        is_compaction_task_expired(
187            self.compaction_group_version_id,
188            compaction_group_version_id_expected,
189        )
190    }
191
192    /// Resolves the physical filter layout for one output SST.
193    ///
194    /// The caller should pass an output-SST-level key-count estimate. This differs from the legacy
195    /// task-level check below: a large compaction task can produce many small output SSTs, and those
196    /// outputs should be allowed to use plain filters when each output is below the threshold.
197    pub fn sstable_filter_layout_for_output(
198        &self,
199        estimated_output_key_count: u64,
200    ) -> PbSstableFilterLayout {
201        match self.sstable_filter_layout {
202            PbSstableFilterLayout::Plain => PbSstableFilterLayout::Plain,
203            PbSstableFilterLayout::Blocked => PbSstableFilterLayout::Blocked,
204            PbSstableFilterLayout::Auto | PbSstableFilterLayout::Unspecified => {
205                if crate::filter_utils::should_use_blocked_xor_filter_by_kv_count(
206                    estimated_output_key_count,
207                    self.blocked_xor_filter_kv_count_threshold,
208                ) {
209                    PbSstableFilterLayout::Blocked
210                } else {
211                    PbSstableFilterLayout::Plain
212                }
213            }
214        }
215    }
216
217    /// Returns the effective vnode key-range hint limit (in bytes) for this compaction task.
218    ///
219    /// The hint is only meaningful when all input SSTs belong to a single table.
220    pub fn effective_max_vnode_key_range_bytes(&self) -> Option<usize> {
221        let limit = self.max_vnode_key_range_bytes.filter(|&v| v > 0)? as usize;
222        (self.get_table_ids_from_input_ssts().count() == 1).then_some(limit)
223    }
224
225    /// Returns input SSTs that should be read by the compactor.
226    ///
227    /// SST entries with empty `table_ids` are ignored defensively and should not be read by the
228    /// compactor.
229    pub fn read_input_ssts(&self) -> impl Iterator<Item = &SstableInfo> {
230        self.input_ssts
231            .iter()
232            .flat_map(|level| level.read_sstable_infos())
233    }
234}
235
236fn split_watermark_serde_types(
237    pb_compact_task: &PbCompactTask,
238) -> (
239    BTreeMap<TableId, TableWatermarks>,
240    BTreeMap<TableId, TableWatermarks>,
241    BTreeMap<TableId, TableWatermarks>,
242) {
243    let mut pk_prefix_table_watermarks = BTreeMap::default();
244    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
245    let mut value_table_watermarks = BTreeMap::default();
246    for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
247        let watermark = TableWatermarks::from(pbwatermark);
248        match watermark.watermark_type {
249            WatermarkSerdeType::PkPrefix => {
250                pk_prefix_table_watermarks.insert(*table_id, watermark);
251            }
252            WatermarkSerdeType::NonPkPrefix => {
253                non_pk_prefix_table_watermarks.insert(*table_id, watermark);
254            }
255            WatermarkSerdeType::Value => {
256                value_table_watermarks.insert(*table_id, watermark);
257            }
258        }
259    }
260    (
261        pk_prefix_table_watermarks,
262        non_pk_prefix_table_watermarks,
263        value_table_watermarks,
264    )
265}
266
267pub fn is_compaction_task_expired(
268    compaction_group_version_id_in_task: u64,
269    compaction_group_version_id_expected: u64,
270) -> bool {
271    compaction_group_version_id_in_task != compaction_group_version_id_expected
272}
273
274impl From<PbCompactTask> for CompactTask {
275    fn from(pb_compact_task: PbCompactTask) -> Self {
276        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
277            split_watermark_serde_types(&pb_compact_task);
278        #[expect(deprecated)]
279        Self {
280            input_ssts: pb_compact_task
281                .input_ssts
282                .into_iter()
283                .map(InputLevel::from)
284                .collect_vec(),
285            splits: pb_compact_task
286                .splits
287                .into_iter()
288                .map(|pb_keyrange| KeyRange {
289                    left: pb_keyrange.left.into(),
290                    right: pb_keyrange.right.into(),
291                    right_exclusive: pb_keyrange.right_exclusive,
292                })
293                .collect_vec(),
294            sorted_output_ssts: pb_compact_task
295                .sorted_output_ssts
296                .into_iter()
297                .map(SstableInfo::from)
298                .collect_vec(),
299            task_id: pb_compact_task.task_id,
300            target_level: pb_compact_task.target_level,
301            gc_delete_keys: pb_compact_task.gc_delete_keys,
302            base_level: pb_compact_task.base_level,
303            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
304            compaction_group_id: pb_compact_task.compaction_group_id,
305            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
306            compression_algorithm: pb_compact_task.compression_algorithm,
307            target_file_size: pb_compact_task.target_file_size,
308            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
309            table_options: pb_compact_task
310                .table_options
311                .iter()
312                .map(|(table_id, v)| (*table_id, *v))
313                .collect(),
314            current_epoch_time: pb_compact_task.current_epoch_time,
315            target_sub_level_id: pb_compact_task.target_sub_level_id,
316            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
317            split_by_state_table: pb_compact_task.split_by_state_table,
318            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
319            table_vnode_partition: pb_compact_task
320                .table_vnode_partition
321                .iter()
322                .map(|(table_id, v)| (*table_id, *v))
323                .collect(),
324            pk_prefix_table_watermarks,
325            non_pk_prefix_table_watermarks,
326            value_table_watermarks,
327            table_schemas: pb_compact_task
328                .table_schemas
329                .iter()
330                .map(|(table_id, v)| (*table_id, v.clone()))
331                .collect(),
332            max_sub_compaction: pb_compact_task.max_sub_compaction,
333            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
334            blocked_xor_filter_kv_count_threshold: pb_compact_task.max_kv_count_for_xor16,
335            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
336            sstable_filter_type: PbSstableFilterType::try_from(pb_compact_task.sstable_filter_type)
337                .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
338            sstable_filter_layout: PbSstableFilterLayout::try_from(
339                pb_compact_task.sstable_filter_layout,
340            )
341            .unwrap_or(PbSstableFilterLayout::Auto),
342        }
343    }
344}
345
346impl From<&PbCompactTask> for CompactTask {
347    fn from(pb_compact_task: &PbCompactTask) -> Self {
348        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
349            split_watermark_serde_types(pb_compact_task);
350        #[expect(deprecated)]
351        Self {
352            input_ssts: pb_compact_task
353                .input_ssts
354                .iter()
355                .map(InputLevel::from)
356                .collect_vec(),
357            splits: pb_compact_task
358                .splits
359                .iter()
360                .map(|pb_keyrange| KeyRange {
361                    left: pb_keyrange.left.clone().into(),
362                    right: pb_keyrange.right.clone().into(),
363                    right_exclusive: pb_keyrange.right_exclusive,
364                })
365                .collect_vec(),
366            sorted_output_ssts: pb_compact_task
367                .sorted_output_ssts
368                .iter()
369                .map(SstableInfo::from)
370                .collect_vec(),
371            task_id: pb_compact_task.task_id,
372            target_level: pb_compact_task.target_level,
373            gc_delete_keys: pb_compact_task.gc_delete_keys,
374            base_level: pb_compact_task.base_level,
375            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
376            compaction_group_id: pb_compact_task.compaction_group_id,
377            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
378            compression_algorithm: pb_compact_task.compression_algorithm,
379            target_file_size: pb_compact_task.target_file_size,
380            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
381            table_options: pb_compact_task
382                .table_options
383                .iter()
384                .map(|(table_id, v)| (*table_id, *v))
385                .collect(),
386            current_epoch_time: pb_compact_task.current_epoch_time,
387            target_sub_level_id: pb_compact_task.target_sub_level_id,
388            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
389            split_by_state_table: pb_compact_task.split_by_state_table,
390            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
391            table_vnode_partition: pb_compact_task
392                .table_vnode_partition
393                .iter()
394                .map(|(table_id, v)| (*table_id, *v))
395                .collect(),
396            pk_prefix_table_watermarks,
397            non_pk_prefix_table_watermarks,
398            value_table_watermarks,
399            table_schemas: pb_compact_task
400                .table_schemas
401                .iter()
402                .map(|(table_id, v)| (*table_id, v.clone()))
403                .collect(),
404            max_sub_compaction: pb_compact_task.max_sub_compaction,
405            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
406            blocked_xor_filter_kv_count_threshold: pb_compact_task.max_kv_count_for_xor16,
407            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
408            sstable_filter_type: PbSstableFilterType::try_from(pb_compact_task.sstable_filter_type)
409                .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
410            sstable_filter_layout: PbSstableFilterLayout::try_from(
411                pb_compact_task.sstable_filter_layout,
412            )
413            .unwrap_or(PbSstableFilterLayout::Auto),
414        }
415    }
416}
417
418impl From<CompactTask> for PbCompactTask {
419    fn from(compact_task: CompactTask) -> Self {
420        #[expect(deprecated)]
421        Self {
422            input_ssts: compact_task
423                .input_ssts
424                .into_iter()
425                .map(|input_level| input_level.into())
426                .collect_vec(),
427            splits: compact_task
428                .splits
429                .into_iter()
430                .map(|keyrange| PbKeyRange {
431                    left: keyrange.left.into(),
432                    right: keyrange.right.into(),
433                    right_exclusive: keyrange.right_exclusive,
434                })
435                .collect_vec(),
436            sorted_output_ssts: compact_task
437                .sorted_output_ssts
438                .into_iter()
439                .map(|sst| sst.into())
440                .collect_vec(),
441            task_id: compact_task.task_id,
442            target_level: compact_task.target_level,
443            gc_delete_keys: compact_task.gc_delete_keys,
444            base_level: compact_task.base_level,
445            task_status: compact_task.task_status.into(),
446            compaction_group_id: compact_task.compaction_group_id,
447            existing_table_ids: compact_task.existing_table_ids.clone(),
448            compression_algorithm: compact_task.compression_algorithm,
449            target_file_size: compact_task.target_file_size,
450            compaction_filter_mask: compact_task.compaction_filter_mask,
451            table_options: compact_task.table_options.clone(),
452            current_epoch_time: compact_task.current_epoch_time,
453            target_sub_level_id: compact_task.target_sub_level_id,
454            task_type: compact_task.task_type.into(),
455            split_weight_by_vnode: compact_task.split_weight_by_vnode,
456            table_vnode_partition: compact_task.table_vnode_partition.clone(),
457            table_watermarks: compact_task
458                .pk_prefix_table_watermarks
459                .into_iter()
460                .chain(compact_task.non_pk_prefix_table_watermarks)
461                .chain(compact_task.value_table_watermarks)
462                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
463                .collect(),
464            split_by_state_table: compact_task.split_by_state_table,
465            table_schemas: compact_task.table_schemas.clone(),
466            max_sub_compaction: compact_task.max_sub_compaction,
467            compaction_group_version_id: compact_task.compaction_group_version_id,
468            max_kv_count_for_xor16: compact_task.blocked_xor_filter_kv_count_threshold,
469            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
470            sstable_filter_type: compact_task.sstable_filter_type.into(),
471            sstable_filter_layout: compact_task.sstable_filter_layout.into(),
472        }
473    }
474}
475
476impl From<&CompactTask> for PbCompactTask {
477    fn from(compact_task: &CompactTask) -> Self {
478        #[expect(deprecated)]
479        Self {
480            input_ssts: compact_task
481                .input_ssts
482                .iter()
483                .map(|input_level| input_level.into())
484                .collect_vec(),
485            splits: compact_task
486                .splits
487                .iter()
488                .map(|keyrange| PbKeyRange {
489                    left: keyrange.left.to_vec(),
490                    right: keyrange.right.to_vec(),
491                    right_exclusive: keyrange.right_exclusive,
492                })
493                .collect_vec(),
494            sorted_output_ssts: compact_task
495                .sorted_output_ssts
496                .iter()
497                .map(|sst| sst.into())
498                .collect_vec(),
499            task_id: compact_task.task_id,
500            target_level: compact_task.target_level,
501            gc_delete_keys: compact_task.gc_delete_keys,
502            base_level: compact_task.base_level,
503            task_status: compact_task.task_status.into(),
504            compaction_group_id: compact_task.compaction_group_id,
505            existing_table_ids: compact_task.existing_table_ids.clone(),
506            compression_algorithm: compact_task.compression_algorithm,
507            target_file_size: compact_task.target_file_size,
508            compaction_filter_mask: compact_task.compaction_filter_mask,
509            table_options: compact_task.table_options.clone(),
510            current_epoch_time: compact_task.current_epoch_time,
511            target_sub_level_id: compact_task.target_sub_level_id,
512            task_type: compact_task.task_type.into(),
513            split_weight_by_vnode: compact_task.split_weight_by_vnode,
514            table_vnode_partition: compact_task.table_vnode_partition.clone(),
515            table_watermarks: compact_task
516                .pk_prefix_table_watermarks
517                .iter()
518                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
519                .chain(compact_task.value_table_watermarks.iter())
520                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
521                .collect(),
522            split_by_state_table: compact_task.split_by_state_table,
523            table_schemas: compact_task.table_schemas.clone(),
524            max_sub_compaction: compact_task.max_sub_compaction,
525            compaction_group_version_id: compact_task.compaction_group_version_id,
526            max_kv_count_for_xor16: compact_task.blocked_xor_filter_kv_count_threshold,
527            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
528            sstable_filter_type: compact_task.sstable_filter_type.into(),
529            sstable_filter_layout: compact_task.sstable_filter_layout.into(),
530        }
531    }
532}
533
534impl From<PbCompactTaskAssignment> for CompactTaskAssignment {
535    fn from(assignment: PbCompactTaskAssignment) -> Self {
536        Self {
537            compact_task: CompactTask::from(assignment.compact_task.unwrap()),
538            context_id: assignment.context_id,
539        }
540    }
541}
542
543impl From<&PbCompactTaskAssignment> for CompactTaskAssignment {
544    fn from(assignment: &PbCompactTaskAssignment) -> Self {
545        Self {
546            compact_task: CompactTask::from(assignment.compact_task.as_ref().unwrap()),
547            context_id: assignment.context_id,
548        }
549    }
550}
551
552impl From<CompactTaskAssignment> for PbCompactTaskAssignment {
553    fn from(assignment: CompactTaskAssignment) -> Self {
554        Self {
555            compact_task: Some(assignment.compact_task.into()),
556            context_id: assignment.context_id,
557        }
558    }
559}
560
561impl From<&CompactTaskAssignment> for PbCompactTaskAssignment {
562    fn from(assignment: &CompactTaskAssignment) -> Self {
563        Self {
564            compact_task: Some((&assignment.compact_task).into()),
565            context_id: assignment.context_id,
566        }
567    }
568}
569
570#[derive(Clone, PartialEq, Default)]
571pub struct ValidationTask {
572    pub sst_infos: Vec<SstableInfo>,
573    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
574}
575
576impl From<PbValidationTask> for ValidationTask {
577    fn from(pb_validation_task: PbValidationTask) -> Self {
578        Self {
579            sst_infos: pb_validation_task
580                .sst_infos
581                .into_iter()
582                .map(SstableInfo::from)
583                .collect_vec(),
584            sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
585        }
586    }
587}
588
589impl From<ValidationTask> for PbValidationTask {
590    fn from(validation_task: ValidationTask) -> Self {
591        Self {
592            sst_infos: validation_task
593                .sst_infos
594                .into_iter()
595                .map(|sst| sst.into())
596                .collect_vec(),
597            sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
598        }
599    }
600}
601
602#[derive(Clone, PartialEq, Default, Debug)]
603pub struct ReportTask {
604    pub table_stats_change: HashMap<TableId, PbTableStats>,
605    pub task_id: u64,
606    pub task_status: TaskStatus,
607    pub sorted_output_ssts: Vec<SstableInfo>,
608    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
609}
610
611impl From<PbReportTask> for ReportTask {
612    fn from(value: PbReportTask) -> Self {
613        Self {
614            table_stats_change: value.table_stats_change.clone(),
615            task_id: value.task_id,
616            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
617            sorted_output_ssts: value
618                .sorted_output_ssts
619                .into_iter()
620                .map(SstableInfo::from)
621                .collect_vec(),
622            object_timestamps: value.object_timestamps,
623        }
624    }
625}
626
627impl From<ReportTask> for PbReportTask {
628    fn from(value: ReportTask) -> Self {
629        Self {
630            table_stats_change: value.table_stats_change.clone(),
631            task_id: value.task_id,
632            task_status: value.task_status.into(),
633            sorted_output_ssts: value
634                .sorted_output_ssts
635                .into_iter()
636                .map(|sst| sst.into())
637                .collect_vec(),
638            object_timestamps: value.object_timestamps,
639        }
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use risingwave_common::catalog::TableId;
646    use risingwave_pb::hummock::compact_task::TaskType;
647    use risingwave_pb::hummock::{PbCompactTask, PbLevelType, PbSstableFilterLayout};
648
649    use super::CompactTask;
650    use crate::level::InputLevel;
651    use crate::sstable_info::{SstableInfo, SstableInfoInner};
652
653    fn test_sstable(sst_id: u64, table_ids: Vec<TableId>) -> SstableInfo {
654        SstableInfo::from(SstableInfoInner {
655            object_id: sst_id.into(),
656            sst_id: sst_id.into(),
657            table_ids,
658            ..Default::default()
659        })
660    }
661
662    fn test_read_property_sstable(table_ids: Vec<TableId>) -> SstableInfo {
663        SstableInfo::from(SstableInfoInner {
664            object_id: 1.into(),
665            sst_id: 2.into(),
666            table_ids,
667            range_tombstone_count: 1,
668            total_key_count: 100,
669            ..Default::default()
670        })
671    }
672
673    #[test]
674    fn test_blocked_xor_filter_kv_count_threshold_roundtrip() {
675        let pb = PbCompactTask {
676            max_kv_count_for_xor16: Some(123),
677            ..Default::default()
678        };
679
680        let task = CompactTask::from(&pb);
681        assert_eq!(task.blocked_xor_filter_kv_count_threshold, Some(123));
682
683        let pb2 = PbCompactTask::from(&task);
684        assert_eq!(pb2.max_kv_count_for_xor16, Some(123));
685    }
686
687    #[test]
688    fn test_sstable_filter_layout_for_output() {
689        let task = CompactTask {
690            sstable_filter_layout: PbSstableFilterLayout::Auto,
691            blocked_xor_filter_kv_count_threshold: Some(100),
692            ..Default::default()
693        };
694
695        assert_eq!(
696            task.sstable_filter_layout_for_output(100),
697            PbSstableFilterLayout::Plain
698        );
699        assert_eq!(
700            task.sstable_filter_layout_for_output(101),
701            PbSstableFilterLayout::Blocked
702        );
703
704        let task = CompactTask {
705            sstable_filter_layout: PbSstableFilterLayout::Plain,
706            blocked_xor_filter_kv_count_threshold: Some(100),
707            ..Default::default()
708        };
709
710        assert_eq!(
711            task.sstable_filter_layout_for_output(101),
712            PbSstableFilterLayout::Plain
713        );
714
715        let task = CompactTask {
716            sstable_filter_layout: PbSstableFilterLayout::Blocked,
717            blocked_xor_filter_kv_count_threshold: Some(100),
718            ..Default::default()
719        };
720
721        assert_eq!(
722            task.sstable_filter_layout_for_output(0),
723            PbSstableFilterLayout::Blocked
724        );
725    }
726
727    #[test]
728    fn test_empty_table_ids_are_reclaim_and_have_no_input_table_ids() {
729        let task = CompactTask {
730            input_ssts: vec![InputLevel {
731                table_infos: vec![test_sstable(1, vec![])],
732                ..Default::default()
733            }],
734            existing_table_ids: vec![TableId::new(1)],
735            ..Default::default()
736        };
737
738        assert!(task.get_table_ids_from_input_ssts().next().is_none());
739        assert!(task.is_trivial_reclaim());
740    }
741
742    #[test]
743    fn test_read_properties_ignore_empty_table_ids() {
744        let task = CompactTask {
745            input_ssts: vec![InputLevel {
746                table_infos: vec![
747                    test_read_property_sstable(vec![]),
748                    SstableInfo::from(SstableInfoInner {
749                        object_id: 3.into(),
750                        sst_id: 3.into(),
751                        table_ids: vec![TableId::new(1)],
752                        total_key_count: 1,
753                        ..Default::default()
754                    }),
755                ],
756                ..Default::default()
757            }],
758            ..Default::default()
759        };
760
761        assert!(!task.contains_range_tombstone());
762        assert!(!task.contains_split_sst());
763
764        let task = CompactTask {
765            input_ssts: vec![InputLevel {
766                table_infos: vec![test_read_property_sstable(vec![TableId::new(1)])],
767                ..Default::default()
768            }],
769            ..Default::default()
770        };
771
772        assert!(task.contains_range_tombstone());
773        assert!(task.contains_split_sst());
774    }
775
776    #[test]
777    fn test_task_label_for_trivial_move_and_reclaim() {
778        let trivial_move_task = CompactTask {
779            input_ssts: vec![
780                InputLevel {
781                    level_idx: 1,
782                    level_type: PbLevelType::Nonoverlapping,
783                    table_infos: vec![
784                        test_sstable(10, vec![TableId::new(1)]),
785                        test_sstable(11, vec![]),
786                    ],
787                },
788                InputLevel {
789                    level_idx: 2,
790                    level_type: PbLevelType::Nonoverlapping,
791                    table_infos: vec![],
792                },
793            ],
794            target_level: 2,
795            task_type: TaskType::Dynamic,
796            ..Default::default()
797        };
798
799        assert!(!trivial_move_task.is_trivial_reclaim());
800        assert_eq!(trivial_move_task.task_label(), "trivial-move");
801
802        let trivial_reclaim_task = CompactTask {
803            input_ssts: vec![
804                InputLevel {
805                    level_idx: 1,
806                    level_type: PbLevelType::Nonoverlapping,
807                    table_infos: vec![test_sstable(10, vec![])],
808                },
809                InputLevel {
810                    level_idx: 2,
811                    level_type: PbLevelType::Nonoverlapping,
812                    table_infos: vec![],
813                },
814            ],
815            target_level: 2,
816            task_type: TaskType::Dynamic,
817            ..Default::default()
818        };
819
820        assert!(trivial_reclaim_task.is_trivial_reclaim());
821        assert!(trivial_reclaim_task.is_trivial_move_task());
822        assert_eq!(trivial_reclaim_task.task_label(), "trivial-space-reclaim");
823    }
824}