risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37    /// SSTs selected by meta, which will be removed from LSM after compaction succeeds.
38    ///
39    /// Each SST's `table_ids` comes from version metadata, where truncate/drop table deltas prune
40    /// table ids before later compaction tasks are picked.
41    pub input_ssts: Vec<InputLevel>,
42    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
43    /// corresponding to that in `splits`, respectively
44    pub splits: Vec<KeyRange>,
45    /// compaction output, which will be added to `target_level` of LSM after compaction
46    pub sorted_output_ssts: Vec<SstableInfo>,
47    /// task id assigned by hummock storage service
48    pub task_id: u64,
49    /// compaction output will be added to `target_level` of LSM after compaction
50    pub target_level: u32,
51    pub gc_delete_keys: bool,
52    /// Lbase in LSM
53    pub base_level: u32,
54    pub task_status: PbTaskStatus,
55    /// compaction group the task belongs to.
56    pub compaction_group_id: CompactionGroupId,
57    /// compaction group id when the compaction task is created
58    pub compaction_group_version_id: u64,
59    /// Table ids that still exist in the version when the compaction task is picked.
60    ///
61    /// This may be broader than the table ids touched by this task's input SSTs. Task-local table
62    /// ids should be read from `input_ssts[*].table_ids`.
63    pub existing_table_ids: Vec<TableId>,
64    pub compression_algorithm: u32,
65    pub target_file_size: u64,
66    pub compaction_filter_mask: u32,
67    pub table_options: BTreeMap<TableId, PbTableOption>,
68    pub current_epoch_time: u64,
69    pub target_sub_level_id: u64,
70    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
71    pub task_type: PbTaskType,
72    /// Deprecated. use `table_vnode_partition` instead;
73    pub split_by_state_table: bool,
74    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
75    /// Deprecated. use `table_vnode_partition` instead;
76    pub split_weight_by_vnode: u32,
77    pub table_vnode_partition: BTreeMap<TableId, u32>,
78    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
79    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
80    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
81
82    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
83    pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
84
85    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
86
87    pub max_sub_compaction: u32,
88
89    pub max_kv_count_for_xor16: Option<u64>,
90
91    pub max_vnode_key_range_bytes: Option<u64>,
92}
93
94impl CompactTask {
95    pub fn estimated_encode_len(&self) -> usize {
96        self.input_ssts
97            .iter()
98            .map(|input_level| input_level.estimated_encode_len())
99            .sum::<usize>()
100            + self
101                .splits
102                .iter()
103                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
104                .sum::<usize>()
105            + size_of::<u64>()
106            + self
107                .sorted_output_ssts
108                .iter()
109                .map(|sst| sst.estimated_encode_len())
110                .sum::<usize>()
111            + size_of::<u64>()
112            + size_of::<u32>()
113            + size_of::<bool>()
114            + size_of::<u32>()
115            + size_of::<i32>()
116            + size_of::<u64>()
117            + self.existing_table_ids.len() * size_of::<u32>()
118            + size_of::<u32>()
119            + size_of::<u64>()
120            + size_of::<u32>()
121            + self.table_options.len() * size_of::<u64>()
122            + size_of::<u64>()
123            + size_of::<u64>()
124            + size_of::<i32>()
125            + size_of::<bool>()
126            + size_of::<u32>()
127            + self.table_vnode_partition.len() * size_of::<u64>()
128            + self
129                .pk_prefix_table_watermarks
130                .values()
131                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
132                .sum::<usize>()
133            + self
134                .non_pk_prefix_table_watermarks
135                .values()
136                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
137                .sum::<usize>()
138            + self
139                .max_vnode_key_range_bytes
140                .map(|_| size_of::<u32>())
141                .unwrap_or_default()
142            + self
143                .value_table_watermarks
144                .values()
145                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
146                .sum::<usize>()
147    }
148
149    pub fn is_trivial_move_task(&self) -> bool {
150        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
151            return false;
152        }
153
154        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
155        {
156            return false;
157        }
158
159        // it may be a manual compaction task
160        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
161            && self.input_ssts[0].level_idx > 0
162        {
163            return false;
164        }
165
166        if self.input_ssts[1].level_idx == self.target_level
167            && self.input_ssts[1].table_infos.is_empty()
168        {
169            return true;
170        }
171
172        false
173    }
174
175    pub fn task_label(&self) -> &'static str {
176        if self.is_trivial_reclaim() {
177            return "trivial-space-reclaim";
178        }
179
180        if self.is_trivial_move_task() {
181            return "trivial-move";
182        }
183
184        "normal"
185    }
186
187    pub fn is_trivial_reclaim(&self) -> bool {
188        // Currently all VnodeWatermark tasks are trivial reclaim.
189        if self.task_type == TaskType::VnodeWatermark {
190            return true;
191        }
192        self.input_ssts
193            .iter()
194            .flat_map(|level| level.table_infos.iter())
195            .all(|sst| sst.table_ids.is_empty())
196    }
197}
198
199impl CompactTask {
200    // The compact task may need to reclaim key with TTL
201    pub fn contains_ttl(&self) -> bool {
202        self.table_options
203            .iter()
204            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
205    }
206
207    // The compact task may need to reclaim key with range tombstone
208    pub fn contains_range_tombstone(&self) -> bool {
209        self.read_input_ssts()
210            .any(|sst| sst.range_tombstone_count > 0)
211    }
212
213    // The compact task may need to reclaim key with split sst
214    pub fn contains_split_sst(&self) -> bool {
215        self.read_input_ssts()
216            .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
217    }
218
219    /// Returns sorted and deduplicated table ids from this task's normalized input SST metadata.
220    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
221        self.input_ssts
222            .iter()
223            .flat_map(|level| level.table_infos.iter())
224            .flat_map(|sst| sst.table_ids.iter().copied())
225            .sorted()
226            .dedup()
227    }
228
229    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
230        is_compaction_task_expired(
231            self.compaction_group_version_id,
232            compaction_group_version_id_expected,
233        )
234    }
235
236    /// Determines whether to use block-based filter for this compaction task.
237    /// Returns true if the total key count exceeds the configured threshold.
238    pub fn should_use_block_based_filter(&self) -> bool {
239        let kv_count = self
240            .read_input_ssts()
241            .map(|sst| sst.total_key_count)
242            .sum::<u64>();
243
244        crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
245    }
246
247    /// Returns the effective vnode key-range hint limit (in bytes) for this compaction task.
248    ///
249    /// The hint is only meaningful when all input SSTs belong to a single table.
250    pub fn effective_max_vnode_key_range_bytes(&self) -> Option<usize> {
251        let limit = self.max_vnode_key_range_bytes.filter(|&v| v > 0)? as usize;
252        (self.get_table_ids_from_input_ssts().count() == 1).then_some(limit)
253    }
254
255    /// Returns input SSTs that should be read by the compactor.
256    ///
257    /// SST entries with empty `table_ids` are ignored defensively and should not be read by the
258    /// compactor.
259    pub fn read_input_ssts(&self) -> impl Iterator<Item = &SstableInfo> {
260        self.input_ssts
261            .iter()
262            .flat_map(|level| level.read_sstable_infos())
263    }
264}
265
266fn split_watermark_serde_types(
267    pb_compact_task: &PbCompactTask,
268) -> (
269    BTreeMap<TableId, TableWatermarks>,
270    BTreeMap<TableId, TableWatermarks>,
271    BTreeMap<TableId, TableWatermarks>,
272) {
273    let mut pk_prefix_table_watermarks = BTreeMap::default();
274    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
275    let mut value_table_watermarks = BTreeMap::default();
276    for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
277        let watermark = TableWatermarks::from(pbwatermark);
278        match watermark.watermark_type {
279            WatermarkSerdeType::PkPrefix => {
280                pk_prefix_table_watermarks.insert(*table_id, watermark);
281            }
282            WatermarkSerdeType::NonPkPrefix => {
283                non_pk_prefix_table_watermarks.insert(*table_id, watermark);
284            }
285            WatermarkSerdeType::Value => {
286                value_table_watermarks.insert(*table_id, watermark);
287            }
288        }
289    }
290    (
291        pk_prefix_table_watermarks,
292        non_pk_prefix_table_watermarks,
293        value_table_watermarks,
294    )
295}
296
297pub fn is_compaction_task_expired(
298    compaction_group_version_id_in_task: u64,
299    compaction_group_version_id_expected: u64,
300) -> bool {
301    compaction_group_version_id_in_task != compaction_group_version_id_expected
302}
303
304impl From<PbCompactTask> for CompactTask {
305    fn from(pb_compact_task: PbCompactTask) -> Self {
306        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
307            split_watermark_serde_types(&pb_compact_task);
308        #[expect(deprecated)]
309        Self {
310            input_ssts: pb_compact_task
311                .input_ssts
312                .into_iter()
313                .map(InputLevel::from)
314                .collect_vec(),
315            splits: pb_compact_task
316                .splits
317                .into_iter()
318                .map(|pb_keyrange| KeyRange {
319                    left: pb_keyrange.left.into(),
320                    right: pb_keyrange.right.into(),
321                    right_exclusive: pb_keyrange.right_exclusive,
322                })
323                .collect_vec(),
324            sorted_output_ssts: pb_compact_task
325                .sorted_output_ssts
326                .into_iter()
327                .map(SstableInfo::from)
328                .collect_vec(),
329            task_id: pb_compact_task.task_id,
330            target_level: pb_compact_task.target_level,
331            gc_delete_keys: pb_compact_task.gc_delete_keys,
332            base_level: pb_compact_task.base_level,
333            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
334            compaction_group_id: pb_compact_task.compaction_group_id,
335            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
336            compression_algorithm: pb_compact_task.compression_algorithm,
337            target_file_size: pb_compact_task.target_file_size,
338            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
339            table_options: pb_compact_task
340                .table_options
341                .iter()
342                .map(|(table_id, v)| (*table_id, *v))
343                .collect(),
344            current_epoch_time: pb_compact_task.current_epoch_time,
345            target_sub_level_id: pb_compact_task.target_sub_level_id,
346            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
347            split_by_state_table: pb_compact_task.split_by_state_table,
348            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
349            table_vnode_partition: pb_compact_task
350                .table_vnode_partition
351                .iter()
352                .map(|(table_id, v)| (*table_id, *v))
353                .collect(),
354            pk_prefix_table_watermarks,
355            non_pk_prefix_table_watermarks,
356            value_table_watermarks,
357            table_schemas: pb_compact_task
358                .table_schemas
359                .iter()
360                .map(|(table_id, v)| (*table_id, v.clone()))
361                .collect(),
362            max_sub_compaction: pb_compact_task.max_sub_compaction,
363            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
364            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
365            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
366        }
367    }
368}
369
370impl From<&PbCompactTask> for CompactTask {
371    fn from(pb_compact_task: &PbCompactTask) -> Self {
372        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
373            split_watermark_serde_types(pb_compact_task);
374        #[expect(deprecated)]
375        Self {
376            input_ssts: pb_compact_task
377                .input_ssts
378                .iter()
379                .map(InputLevel::from)
380                .collect_vec(),
381            splits: pb_compact_task
382                .splits
383                .iter()
384                .map(|pb_keyrange| KeyRange {
385                    left: pb_keyrange.left.clone().into(),
386                    right: pb_keyrange.right.clone().into(),
387                    right_exclusive: pb_keyrange.right_exclusive,
388                })
389                .collect_vec(),
390            sorted_output_ssts: pb_compact_task
391                .sorted_output_ssts
392                .iter()
393                .map(SstableInfo::from)
394                .collect_vec(),
395            task_id: pb_compact_task.task_id,
396            target_level: pb_compact_task.target_level,
397            gc_delete_keys: pb_compact_task.gc_delete_keys,
398            base_level: pb_compact_task.base_level,
399            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
400            compaction_group_id: pb_compact_task.compaction_group_id,
401            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
402            compression_algorithm: pb_compact_task.compression_algorithm,
403            target_file_size: pb_compact_task.target_file_size,
404            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
405            table_options: pb_compact_task
406                .table_options
407                .iter()
408                .map(|(table_id, v)| (*table_id, *v))
409                .collect(),
410            current_epoch_time: pb_compact_task.current_epoch_time,
411            target_sub_level_id: pb_compact_task.target_sub_level_id,
412            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
413            split_by_state_table: pb_compact_task.split_by_state_table,
414            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
415            table_vnode_partition: pb_compact_task
416                .table_vnode_partition
417                .iter()
418                .map(|(table_id, v)| (*table_id, *v))
419                .collect(),
420            pk_prefix_table_watermarks,
421            non_pk_prefix_table_watermarks,
422            value_table_watermarks,
423            table_schemas: pb_compact_task
424                .table_schemas
425                .iter()
426                .map(|(table_id, v)| (*table_id, v.clone()))
427                .collect(),
428            max_sub_compaction: pb_compact_task.max_sub_compaction,
429            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
430            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
431            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
432        }
433    }
434}
435
436impl From<CompactTask> for PbCompactTask {
437    fn from(compact_task: CompactTask) -> Self {
438        #[expect(deprecated)]
439        Self {
440            input_ssts: compact_task
441                .input_ssts
442                .into_iter()
443                .map(|input_level| input_level.into())
444                .collect_vec(),
445            splits: compact_task
446                .splits
447                .into_iter()
448                .map(|keyrange| PbKeyRange {
449                    left: keyrange.left.into(),
450                    right: keyrange.right.into(),
451                    right_exclusive: keyrange.right_exclusive,
452                })
453                .collect_vec(),
454            sorted_output_ssts: compact_task
455                .sorted_output_ssts
456                .into_iter()
457                .map(|sst| sst.into())
458                .collect_vec(),
459            task_id: compact_task.task_id,
460            target_level: compact_task.target_level,
461            gc_delete_keys: compact_task.gc_delete_keys,
462            base_level: compact_task.base_level,
463            task_status: compact_task.task_status.into(),
464            compaction_group_id: compact_task.compaction_group_id,
465            existing_table_ids: compact_task.existing_table_ids.clone(),
466            compression_algorithm: compact_task.compression_algorithm,
467            target_file_size: compact_task.target_file_size,
468            compaction_filter_mask: compact_task.compaction_filter_mask,
469            table_options: compact_task.table_options.clone(),
470            current_epoch_time: compact_task.current_epoch_time,
471            target_sub_level_id: compact_task.target_sub_level_id,
472            task_type: compact_task.task_type.into(),
473            split_weight_by_vnode: compact_task.split_weight_by_vnode,
474            table_vnode_partition: compact_task.table_vnode_partition.clone(),
475            table_watermarks: compact_task
476                .pk_prefix_table_watermarks
477                .into_iter()
478                .chain(compact_task.non_pk_prefix_table_watermarks)
479                .chain(compact_task.value_table_watermarks)
480                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
481                .collect(),
482            split_by_state_table: compact_task.split_by_state_table,
483            table_schemas: compact_task.table_schemas.clone(),
484            max_sub_compaction: compact_task.max_sub_compaction,
485            compaction_group_version_id: compact_task.compaction_group_version_id,
486            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
487            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
488        }
489    }
490}
491
492impl From<&CompactTask> for PbCompactTask {
493    fn from(compact_task: &CompactTask) -> Self {
494        #[expect(deprecated)]
495        Self {
496            input_ssts: compact_task
497                .input_ssts
498                .iter()
499                .map(|input_level| input_level.into())
500                .collect_vec(),
501            splits: compact_task
502                .splits
503                .iter()
504                .map(|keyrange| PbKeyRange {
505                    left: keyrange.left.to_vec(),
506                    right: keyrange.right.to_vec(),
507                    right_exclusive: keyrange.right_exclusive,
508                })
509                .collect_vec(),
510            sorted_output_ssts: compact_task
511                .sorted_output_ssts
512                .iter()
513                .map(|sst| sst.into())
514                .collect_vec(),
515            task_id: compact_task.task_id,
516            target_level: compact_task.target_level,
517            gc_delete_keys: compact_task.gc_delete_keys,
518            base_level: compact_task.base_level,
519            task_status: compact_task.task_status.into(),
520            compaction_group_id: compact_task.compaction_group_id,
521            existing_table_ids: compact_task.existing_table_ids.clone(),
522            compression_algorithm: compact_task.compression_algorithm,
523            target_file_size: compact_task.target_file_size,
524            compaction_filter_mask: compact_task.compaction_filter_mask,
525            table_options: compact_task.table_options.clone(),
526            current_epoch_time: compact_task.current_epoch_time,
527            target_sub_level_id: compact_task.target_sub_level_id,
528            task_type: compact_task.task_type.into(),
529            split_weight_by_vnode: compact_task.split_weight_by_vnode,
530            table_vnode_partition: compact_task.table_vnode_partition.clone(),
531            table_watermarks: compact_task
532                .pk_prefix_table_watermarks
533                .iter()
534                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
535                .chain(compact_task.value_table_watermarks.iter())
536                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
537                .collect(),
538            split_by_state_table: compact_task.split_by_state_table,
539            table_schemas: compact_task.table_schemas.clone(),
540            max_sub_compaction: compact_task.max_sub_compaction,
541            compaction_group_version_id: compact_task.compaction_group_version_id,
542            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
543            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
544        }
545    }
546}
547
548#[derive(Clone, PartialEq, Default)]
549pub struct ValidationTask {
550    pub sst_infos: Vec<SstableInfo>,
551    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
552}
553
554impl From<PbValidationTask> for ValidationTask {
555    fn from(pb_validation_task: PbValidationTask) -> Self {
556        Self {
557            sst_infos: pb_validation_task
558                .sst_infos
559                .into_iter()
560                .map(SstableInfo::from)
561                .collect_vec(),
562            sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
563        }
564    }
565}
566
567impl From<ValidationTask> for PbValidationTask {
568    fn from(validation_task: ValidationTask) -> Self {
569        Self {
570            sst_infos: validation_task
571                .sst_infos
572                .into_iter()
573                .map(|sst| sst.into())
574                .collect_vec(),
575            sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
576        }
577    }
578}
579
580impl ValidationTask {
581    pub fn estimated_encode_len(&self) -> usize {
582        self.sst_infos
583            .iter()
584            .map(|sst| sst.estimated_encode_len())
585            .sum::<usize>()
586            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
587            + size_of::<u64>()
588    }
589}
590
591#[derive(Clone, PartialEq, Default, Debug)]
592pub struct ReportTask {
593    pub table_stats_change: HashMap<TableId, PbTableStats>,
594    pub task_id: u64,
595    pub task_status: TaskStatus,
596    pub sorted_output_ssts: Vec<SstableInfo>,
597    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
598}
599
600impl From<PbReportTask> for ReportTask {
601    fn from(value: PbReportTask) -> Self {
602        Self {
603            table_stats_change: value.table_stats_change.clone(),
604            task_id: value.task_id,
605            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
606            sorted_output_ssts: value
607                .sorted_output_ssts
608                .into_iter()
609                .map(SstableInfo::from)
610                .collect_vec(),
611            object_timestamps: value.object_timestamps,
612        }
613    }
614}
615
616impl From<ReportTask> for PbReportTask {
617    fn from(value: ReportTask) -> Self {
618        Self {
619            table_stats_change: value.table_stats_change.clone(),
620            task_id: value.task_id,
621            task_status: value.task_status.into(),
622            sorted_output_ssts: value
623                .sorted_output_ssts
624                .into_iter()
625                .map(|sst| sst.into())
626                .collect_vec(),
627            object_timestamps: value.object_timestamps,
628        }
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use risingwave_common::catalog::TableId;
635    use risingwave_pb::hummock::PbLevelType;
636    use risingwave_pb::hummock::compact_task::TaskType;
637
638    use super::CompactTask;
639    use crate::level::InputLevel;
640    use crate::sstable_info::{SstableInfo, SstableInfoInner};
641
642    fn test_sstable(sst_id: u64, table_ids: Vec<TableId>) -> SstableInfo {
643        SstableInfo::from(SstableInfoInner {
644            object_id: sst_id.into(),
645            sst_id: sst_id.into(),
646            table_ids,
647            ..Default::default()
648        })
649    }
650
651    fn test_read_property_sstable(table_ids: Vec<TableId>) -> SstableInfo {
652        SstableInfo::from(SstableInfoInner {
653            object_id: 1.into(),
654            sst_id: 2.into(),
655            table_ids,
656            range_tombstone_count: 1,
657            total_key_count: 100,
658            ..Default::default()
659        })
660    }
661
662    #[test]
663    fn test_empty_table_ids_are_reclaim_and_have_no_input_table_ids() {
664        let task = CompactTask {
665            input_ssts: vec![InputLevel {
666                table_infos: vec![test_sstable(1, vec![])],
667                ..Default::default()
668            }],
669            existing_table_ids: vec![TableId::new(1)],
670            ..Default::default()
671        };
672
673        assert!(task.get_table_ids_from_input_ssts().next().is_none());
674        assert!(task.is_trivial_reclaim());
675    }
676
677    #[test]
678    fn test_read_properties_ignore_empty_table_ids() {
679        let task = CompactTask {
680            input_ssts: vec![InputLevel {
681                table_infos: vec![
682                    test_read_property_sstable(vec![]),
683                    SstableInfo::from(SstableInfoInner {
684                        object_id: 3.into(),
685                        sst_id: 3.into(),
686                        table_ids: vec![TableId::new(1)],
687                        total_key_count: 1,
688                        ..Default::default()
689                    }),
690                ],
691                ..Default::default()
692            }],
693            max_kv_count_for_xor16: Some(10),
694            ..Default::default()
695        };
696
697        assert!(!task.contains_range_tombstone());
698        assert!(!task.contains_split_sst());
699        assert!(!task.should_use_block_based_filter());
700
701        let task = CompactTask {
702            input_ssts: vec![InputLevel {
703                table_infos: vec![test_read_property_sstable(vec![TableId::new(1)])],
704                ..Default::default()
705            }],
706            max_kv_count_for_xor16: Some(10),
707            ..Default::default()
708        };
709
710        assert!(task.contains_range_tombstone());
711        assert!(task.contains_split_sst());
712        assert!(task.should_use_block_based_filter());
713    }
714
715    #[test]
716    fn test_task_label_for_trivial_move_and_reclaim() {
717        let trivial_move_task = CompactTask {
718            input_ssts: vec![
719                InputLevel {
720                    level_idx: 1,
721                    level_type: PbLevelType::Nonoverlapping,
722                    table_infos: vec![
723                        test_sstable(10, vec![TableId::new(1)]),
724                        test_sstable(11, vec![]),
725                    ],
726                },
727                InputLevel {
728                    level_idx: 2,
729                    level_type: PbLevelType::Nonoverlapping,
730                    table_infos: vec![],
731                },
732            ],
733            target_level: 2,
734            task_type: TaskType::Dynamic,
735            ..Default::default()
736        };
737
738        assert!(!trivial_move_task.is_trivial_reclaim());
739        assert_eq!(trivial_move_task.task_label(), "trivial-move");
740
741        let trivial_reclaim_task = CompactTask {
742            input_ssts: vec![
743                InputLevel {
744                    level_idx: 1,
745                    level_type: PbLevelType::Nonoverlapping,
746                    table_infos: vec![test_sstable(10, vec![])],
747                },
748                InputLevel {
749                    level_idx: 2,
750                    level_type: PbLevelType::Nonoverlapping,
751                    table_infos: vec![],
752                },
753            ],
754            target_level: 2,
755            task_type: TaskType::Dynamic,
756            ..Default::default()
757        };
758
759        assert!(trivial_reclaim_task.is_trivial_reclaim());
760        assert!(trivial_reclaim_task.is_trivial_move_task());
761        assert_eq!(trivial_reclaim_task.task_label(), "trivial-space-reclaim");
762    }
763}