risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
20use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
21use risingwave_pb::hummock::{
22    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
23    PbValidationTask,
24};
25
26use crate::HummockSstableObjectId;
27use crate::compaction_group::StateTableId;
28use crate::key_range::KeyRange;
29use crate::level::InputLevel;
30use crate::sstable_info::SstableInfo;
31use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
32
33#[derive(Clone, PartialEq, Default, Debug)]
34pub struct CompactTask {
35    /// SSTs to be compacted, which will be removed from LSM after compaction
36    pub input_ssts: Vec<InputLevel>,
37    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
38    /// corresponding to that in `splits`, respectively
39    pub splits: Vec<KeyRange>,
40    /// compaction output, which will be added to `target_level` of LSM after compaction
41    pub sorted_output_ssts: Vec<SstableInfo>,
42    /// task id assigned by hummock storage service
43    pub task_id: u64,
44    /// compaction output will be added to `target_level` of LSM after compaction
45    pub target_level: u32,
46    pub gc_delete_keys: bool,
47    /// Lbase in LSM
48    pub base_level: u32,
49    pub task_status: PbTaskStatus,
50    /// compaction group the task belongs to.
51    pub compaction_group_id: u64,
52    /// compaction group id when the compaction task is created
53    pub compaction_group_version_id: u64,
54    /// `existing_table_ids` for compaction drop key
55    pub existing_table_ids: Vec<u32>,
56    pub compression_algorithm: u32,
57    pub target_file_size: u64,
58    pub compaction_filter_mask: u32,
59    pub table_options: BTreeMap<u32, PbTableOption>,
60    pub current_epoch_time: u64,
61    pub target_sub_level_id: u64,
62    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
63    pub task_type: PbTaskType,
64    /// Deprecated. use `table_vnode_partition` instead;
65    pub split_by_state_table: bool,
66    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
67    /// Deprecated. use `table_vnode_partition` instead;
68    pub split_weight_by_vnode: u32,
69    pub table_vnode_partition: BTreeMap<u32, u32>,
70    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
71    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
72    pub pk_prefix_table_watermarks: BTreeMap<u32, TableWatermarks>,
73
74    pub non_pk_prefix_table_watermarks: BTreeMap<u32, TableWatermarks>,
75
76    pub table_schemas: BTreeMap<u32, PbTableSchema>,
77
78    pub max_sub_compaction: u32,
79}
80
81impl CompactTask {
82    pub fn estimated_encode_len(&self) -> usize {
83        self.input_ssts
84            .iter()
85            .map(|input_level| input_level.estimated_encode_len())
86            .sum::<usize>()
87            + self
88                .splits
89                .iter()
90                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
91                .sum::<usize>()
92            + size_of::<u64>()
93            + self
94                .sorted_output_ssts
95                .iter()
96                .map(|sst| sst.estimated_encode_len())
97                .sum::<usize>()
98            + size_of::<u64>()
99            + size_of::<u32>()
100            + size_of::<bool>()
101            + size_of::<u32>()
102            + size_of::<i32>()
103            + size_of::<u64>()
104            + self.existing_table_ids.len() * size_of::<u32>()
105            + size_of::<u32>()
106            + size_of::<u64>()
107            + size_of::<u32>()
108            + self.table_options.len() * size_of::<u64>()
109            + size_of::<u64>()
110            + size_of::<u64>()
111            + size_of::<i32>()
112            + size_of::<bool>()
113            + size_of::<u32>()
114            + self.table_vnode_partition.len() * size_of::<u64>()
115            + self
116                .pk_prefix_table_watermarks
117                .values()
118                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
119                .sum::<usize>()
120            + self
121                .non_pk_prefix_table_watermarks
122                .values()
123                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
124                .sum::<usize>()
125    }
126
127    pub fn is_trivial_move_task(&self) -> bool {
128        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
129            return false;
130        }
131
132        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
133        {
134            return false;
135        }
136
137        // it may be a manual compaction task
138        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
139            && self.input_ssts[0].level_idx > 0
140        {
141            return false;
142        }
143
144        if self.input_ssts[1].level_idx == self.target_level
145            && self.input_ssts[1].table_infos.is_empty()
146        {
147            return true;
148        }
149
150        false
151    }
152
153    pub fn is_trivial_reclaim(&self) -> bool {
154        // Currently all VnodeWatermark tasks are trivial reclaim.
155        if self.task_type == TaskType::VnodeWatermark {
156            return true;
157        }
158        let exist_table_ids = HashSet::<u32>::from_iter(self.existing_table_ids.clone());
159        self.input_ssts.iter().all(|level| {
160            level.table_infos.iter().all(|sst| {
161                sst.table_ids
162                    .iter()
163                    .all(|table_id| !exist_table_ids.contains(table_id))
164            })
165        })
166    }
167}
168
169impl CompactTask {
170    // The compact task may need to reclaim key with TTL
171    pub fn contains_ttl(&self) -> bool {
172        self.table_options
173            .iter()
174            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
175    }
176
177    // The compact task may need to reclaim key with range tombstone
178    pub fn contains_range_tombstone(&self) -> bool {
179        self.input_ssts
180            .iter()
181            .flat_map(|level| level.table_infos.iter())
182            .any(|sst| sst.range_tombstone_count > 0)
183    }
184
185    // The compact task may need to reclaim key with split sst
186    pub fn contains_split_sst(&self) -> bool {
187        self.input_ssts
188            .iter()
189            .flat_map(|level| level.table_infos.iter())
190            .any(|sst| sst.sst_id != sst.object_id)
191    }
192
193    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
194        self.input_ssts
195            .iter()
196            .flat_map(|level| level.table_infos.iter())
197            .flat_map(|sst| sst.table_ids.clone())
198            .sorted()
199            .dedup()
200    }
201
202    // filter the table-id that in existing_table_ids with the table-id in compact-task
203    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
204        let existing_table_ids: HashSet<u32> = HashSet::from_iter(self.existing_table_ids.clone());
205        self.get_table_ids_from_input_ssts()
206            .filter(|table_id| existing_table_ids.contains(table_id))
207            .collect()
208    }
209
210    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
211        is_compaction_task_expired(
212            self.compaction_group_version_id,
213            compaction_group_version_id_expected,
214        )
215    }
216}
217
218pub fn is_compaction_task_expired(
219    compaction_group_version_id_in_task: u64,
220    compaction_group_version_id_expected: u64,
221) -> bool {
222    compaction_group_version_id_in_task != compaction_group_version_id_expected
223}
224
225impl From<PbCompactTask> for CompactTask {
226    fn from(pb_compact_task: PbCompactTask) -> Self {
227        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
228            .table_watermarks
229            .into_iter()
230            .map(|(table_id, pb_table_watermark)| {
231                let table_watermark = TableWatermarks::from(pb_table_watermark);
232                (table_id, table_watermark)
233            })
234            .partition(|(_table_id, table_watermarke)| {
235                matches!(
236                    table_watermarke.watermark_type,
237                    WatermarkSerdeType::PkPrefix
238                )
239            });
240
241        #[expect(deprecated)]
242        Self {
243            input_ssts: pb_compact_task
244                .input_ssts
245                .into_iter()
246                .map(InputLevel::from)
247                .collect_vec(),
248            splits: pb_compact_task
249                .splits
250                .into_iter()
251                .map(|pb_keyrange| KeyRange {
252                    left: pb_keyrange.left.into(),
253                    right: pb_keyrange.right.into(),
254                    right_exclusive: pb_keyrange.right_exclusive,
255                })
256                .collect_vec(),
257            sorted_output_ssts: pb_compact_task
258                .sorted_output_ssts
259                .into_iter()
260                .map(SstableInfo::from)
261                .collect_vec(),
262            task_id: pb_compact_task.task_id,
263            target_level: pb_compact_task.target_level,
264            gc_delete_keys: pb_compact_task.gc_delete_keys,
265            base_level: pb_compact_task.base_level,
266            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
267            compaction_group_id: pb_compact_task.compaction_group_id,
268            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
269            compression_algorithm: pb_compact_task.compression_algorithm,
270            target_file_size: pb_compact_task.target_file_size,
271            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
272            table_options: pb_compact_task.table_options.clone(),
273            current_epoch_time: pb_compact_task.current_epoch_time,
274            target_sub_level_id: pb_compact_task.target_sub_level_id,
275            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
276            split_by_state_table: pb_compact_task.split_by_state_table,
277            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
278            table_vnode_partition: pb_compact_task.table_vnode_partition.clone(),
279            pk_prefix_table_watermarks,
280            non_pk_prefix_table_watermarks,
281            table_schemas: pb_compact_task.table_schemas,
282            max_sub_compaction: pb_compact_task.max_sub_compaction,
283            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
284        }
285    }
286}
287
288impl From<&PbCompactTask> for CompactTask {
289    fn from(pb_compact_task: &PbCompactTask) -> Self {
290        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
291            .table_watermarks
292            .iter()
293            .map(|(table_id, pb_table_watermark)| {
294                let table_watermark = TableWatermarks::from(pb_table_watermark);
295                (*table_id, table_watermark)
296            })
297            .partition(|(_table_id, table_watermarke)| {
298                matches!(
299                    table_watermarke.watermark_type,
300                    WatermarkSerdeType::PkPrefix
301                )
302            });
303
304        #[expect(deprecated)]
305        Self {
306            input_ssts: pb_compact_task
307                .input_ssts
308                .iter()
309                .map(InputLevel::from)
310                .collect_vec(),
311            splits: pb_compact_task
312                .splits
313                .iter()
314                .map(|pb_keyrange| KeyRange {
315                    left: pb_keyrange.left.clone().into(),
316                    right: pb_keyrange.right.clone().into(),
317                    right_exclusive: pb_keyrange.right_exclusive,
318                })
319                .collect_vec(),
320            sorted_output_ssts: pb_compact_task
321                .sorted_output_ssts
322                .iter()
323                .map(SstableInfo::from)
324                .collect_vec(),
325            task_id: pb_compact_task.task_id,
326            target_level: pb_compact_task.target_level,
327            gc_delete_keys: pb_compact_task.gc_delete_keys,
328            base_level: pb_compact_task.base_level,
329            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
330            compaction_group_id: pb_compact_task.compaction_group_id,
331            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
332            compression_algorithm: pb_compact_task.compression_algorithm,
333            target_file_size: pb_compact_task.target_file_size,
334            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
335            table_options: pb_compact_task.table_options.clone(),
336            current_epoch_time: pb_compact_task.current_epoch_time,
337            target_sub_level_id: pb_compact_task.target_sub_level_id,
338            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
339            split_by_state_table: pb_compact_task.split_by_state_table,
340            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
341            table_vnode_partition: pb_compact_task.table_vnode_partition.clone(),
342            pk_prefix_table_watermarks,
343            non_pk_prefix_table_watermarks,
344            table_schemas: pb_compact_task.table_schemas.clone(),
345            max_sub_compaction: pb_compact_task.max_sub_compaction,
346            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
347        }
348    }
349}
350
351impl From<CompactTask> for PbCompactTask {
352    fn from(compact_task: CompactTask) -> Self {
353        #[expect(deprecated)]
354        Self {
355            input_ssts: compact_task
356                .input_ssts
357                .into_iter()
358                .map(|input_level| input_level.into())
359                .collect_vec(),
360            splits: compact_task
361                .splits
362                .into_iter()
363                .map(|keyrange| PbKeyRange {
364                    left: keyrange.left.into(),
365                    right: keyrange.right.into(),
366                    right_exclusive: keyrange.right_exclusive,
367                })
368                .collect_vec(),
369            sorted_output_ssts: compact_task
370                .sorted_output_ssts
371                .into_iter()
372                .map(|sst| sst.into())
373                .collect_vec(),
374            task_id: compact_task.task_id,
375            target_level: compact_task.target_level,
376            gc_delete_keys: compact_task.gc_delete_keys,
377            base_level: compact_task.base_level,
378            task_status: compact_task.task_status.into(),
379            compaction_group_id: compact_task.compaction_group_id,
380            existing_table_ids: compact_task.existing_table_ids.clone(),
381            compression_algorithm: compact_task.compression_algorithm,
382            target_file_size: compact_task.target_file_size,
383            compaction_filter_mask: compact_task.compaction_filter_mask,
384            table_options: compact_task.table_options.clone(),
385            current_epoch_time: compact_task.current_epoch_time,
386            target_sub_level_id: compact_task.target_sub_level_id,
387            task_type: compact_task.task_type.into(),
388            split_weight_by_vnode: compact_task.split_weight_by_vnode,
389            table_vnode_partition: compact_task.table_vnode_partition.clone(),
390            table_watermarks: compact_task
391                .pk_prefix_table_watermarks
392                .into_iter()
393                .chain(compact_task.non_pk_prefix_table_watermarks)
394                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
395                .collect(),
396            split_by_state_table: compact_task.split_by_state_table,
397            table_schemas: compact_task.table_schemas.clone(),
398            max_sub_compaction: compact_task.max_sub_compaction,
399            compaction_group_version_id: compact_task.compaction_group_version_id,
400        }
401    }
402}
403
404impl From<&CompactTask> for PbCompactTask {
405    fn from(compact_task: &CompactTask) -> Self {
406        #[expect(deprecated)]
407        Self {
408            input_ssts: compact_task
409                .input_ssts
410                .iter()
411                .map(|input_level| input_level.into())
412                .collect_vec(),
413            splits: compact_task
414                .splits
415                .iter()
416                .map(|keyrange| PbKeyRange {
417                    left: keyrange.left.to_vec(),
418                    right: keyrange.right.to_vec(),
419                    right_exclusive: keyrange.right_exclusive,
420                })
421                .collect_vec(),
422            sorted_output_ssts: compact_task
423                .sorted_output_ssts
424                .iter()
425                .map(|sst| sst.into())
426                .collect_vec(),
427            task_id: compact_task.task_id,
428            target_level: compact_task.target_level,
429            gc_delete_keys: compact_task.gc_delete_keys,
430            base_level: compact_task.base_level,
431            task_status: compact_task.task_status.into(),
432            compaction_group_id: compact_task.compaction_group_id,
433            existing_table_ids: compact_task.existing_table_ids.clone(),
434            compression_algorithm: compact_task.compression_algorithm,
435            target_file_size: compact_task.target_file_size,
436            compaction_filter_mask: compact_task.compaction_filter_mask,
437            table_options: compact_task.table_options.clone(),
438            current_epoch_time: compact_task.current_epoch_time,
439            target_sub_level_id: compact_task.target_sub_level_id,
440            task_type: compact_task.task_type.into(),
441            split_weight_by_vnode: compact_task.split_weight_by_vnode,
442            table_vnode_partition: compact_task.table_vnode_partition.clone(),
443            table_watermarks: compact_task
444                .pk_prefix_table_watermarks
445                .iter()
446                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
447                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
448                .collect(),
449            split_by_state_table: compact_task.split_by_state_table,
450            table_schemas: compact_task.table_schemas.clone(),
451            max_sub_compaction: compact_task.max_sub_compaction,
452            compaction_group_version_id: compact_task.compaction_group_version_id,
453        }
454    }
455}
456
457#[derive(Clone, PartialEq, Default)]
458pub struct ValidationTask {
459    pub sst_infos: Vec<SstableInfo>,
460    pub sst_id_to_worker_id: HashMap<u64, u32>,
461}
462
463impl From<PbValidationTask> for ValidationTask {
464    fn from(pb_validation_task: PbValidationTask) -> Self {
465        Self {
466            sst_infos: pb_validation_task
467                .sst_infos
468                .into_iter()
469                .map(SstableInfo::from)
470                .collect_vec(),
471            sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.clone(),
472        }
473    }
474}
475
476impl From<ValidationTask> for PbValidationTask {
477    fn from(validation_task: ValidationTask) -> Self {
478        Self {
479            sst_infos: validation_task
480                .sst_infos
481                .into_iter()
482                .map(|sst| sst.into())
483                .collect_vec(),
484            sst_id_to_worker_id: validation_task.sst_id_to_worker_id.clone(),
485        }
486    }
487}
488
489impl ValidationTask {
490    pub fn estimated_encode_len(&self) -> usize {
491        self.sst_infos
492            .iter()
493            .map(|sst| sst.estimated_encode_len())
494            .sum::<usize>()
495            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
496            + size_of::<u64>()
497    }
498}
499
500#[derive(Clone, PartialEq, Default, Debug)]
501pub struct ReportTask {
502    pub table_stats_change: HashMap<u32, PbTableStats>,
503    pub task_id: u64,
504    pub task_status: TaskStatus,
505    pub sorted_output_ssts: Vec<SstableInfo>,
506    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
507}
508
509impl From<PbReportTask> for ReportTask {
510    fn from(value: PbReportTask) -> Self {
511        Self {
512            table_stats_change: value.table_stats_change.clone(),
513            task_id: value.task_id,
514            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
515            sorted_output_ssts: value
516                .sorted_output_ssts
517                .into_iter()
518                .map(SstableInfo::from)
519                .collect_vec(),
520            object_timestamps: value.object_timestamps,
521        }
522    }
523}
524
525impl From<ReportTask> for PbReportTask {
526    fn from(value: ReportTask) -> Self {
527        Self {
528            table_stats_change: value.table_stats_change.clone(),
529            task_id: value.task_id,
530            task_status: value.task_status.into(),
531            sorted_output_ssts: value
532                .sorted_output_ssts
533                .into_iter()
534                .map(|sst| sst.into())
535                .collect_vec(),
536            object_timestamps: value.object_timestamps,
537        }
538    }
539}