risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36    /// SSTs to be compacted, which will be removed from LSM after compaction
37    pub input_ssts: Vec<InputLevel>,
38    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
39    /// corresponding to that in `splits`, respectively
40    pub splits: Vec<KeyRange>,
41    /// compaction output, which will be added to `target_level` of LSM after compaction
42    pub sorted_output_ssts: Vec<SstableInfo>,
43    /// task id assigned by hummock storage service
44    pub task_id: u64,
45    /// compaction output will be added to `target_level` of LSM after compaction
46    pub target_level: u32,
47    pub gc_delete_keys: bool,
48    /// Lbase in LSM
49    pub base_level: u32,
50    pub task_status: PbTaskStatus,
51    /// compaction group the task belongs to.
52    pub compaction_group_id: u64,
53    /// compaction group id when the compaction task is created
54    pub compaction_group_version_id: u64,
55    /// `existing_table_ids` for compaction drop key
56    pub existing_table_ids: Vec<TableId>,
57    pub compression_algorithm: u32,
58    pub target_file_size: u64,
59    pub compaction_filter_mask: u32,
60    pub table_options: BTreeMap<TableId, PbTableOption>,
61    pub current_epoch_time: u64,
62    pub target_sub_level_id: u64,
63    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
64    pub task_type: PbTaskType,
65    /// Deprecated. use `table_vnode_partition` instead;
66    pub split_by_state_table: bool,
67    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
68    /// Deprecated. use `table_vnode_partition` instead;
69    pub split_weight_by_vnode: u32,
70    pub table_vnode_partition: BTreeMap<TableId, u32>,
71    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
72    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
73    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76
77    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
78
79    pub max_sub_compaction: u32,
80}
81
82impl CompactTask {
83    pub fn estimated_encode_len(&self) -> usize {
84        self.input_ssts
85            .iter()
86            .map(|input_level| input_level.estimated_encode_len())
87            .sum::<usize>()
88            + self
89                .splits
90                .iter()
91                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
92                .sum::<usize>()
93            + size_of::<u64>()
94            + self
95                .sorted_output_ssts
96                .iter()
97                .map(|sst| sst.estimated_encode_len())
98                .sum::<usize>()
99            + size_of::<u64>()
100            + size_of::<u32>()
101            + size_of::<bool>()
102            + size_of::<u32>()
103            + size_of::<i32>()
104            + size_of::<u64>()
105            + self.existing_table_ids.len() * size_of::<u32>()
106            + size_of::<u32>()
107            + size_of::<u64>()
108            + size_of::<u32>()
109            + self.table_options.len() * size_of::<u64>()
110            + size_of::<u64>()
111            + size_of::<u64>()
112            + size_of::<i32>()
113            + size_of::<bool>()
114            + size_of::<u32>()
115            + self.table_vnode_partition.len() * size_of::<u64>()
116            + self
117                .pk_prefix_table_watermarks
118                .values()
119                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
120                .sum::<usize>()
121            + self
122                .non_pk_prefix_table_watermarks
123                .values()
124                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
125                .sum::<usize>()
126    }
127
128    pub fn is_trivial_move_task(&self) -> bool {
129        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
130            return false;
131        }
132
133        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
134        {
135            return false;
136        }
137
138        // it may be a manual compaction task
139        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
140            && self.input_ssts[0].level_idx > 0
141        {
142            return false;
143        }
144
145        if self.input_ssts[1].level_idx == self.target_level
146            && self.input_ssts[1].table_infos.is_empty()
147        {
148            return true;
149        }
150
151        false
152    }
153
154    pub fn is_trivial_reclaim(&self) -> bool {
155        // Currently all VnodeWatermark tasks are trivial reclaim.
156        if self.task_type == TaskType::VnodeWatermark {
157            return true;
158        }
159        let exist_table_ids =
160            HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
161        self.input_ssts.iter().all(|level| {
162            level.table_infos.iter().all(|sst| {
163                sst.table_ids
164                    .iter()
165                    .all(|table_id| !exist_table_ids.contains(table_id))
166            })
167        })
168    }
169}
170
171impl CompactTask {
172    // The compact task may need to reclaim key with TTL
173    pub fn contains_ttl(&self) -> bool {
174        self.table_options
175            .iter()
176            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
177    }
178
179    // The compact task may need to reclaim key with range tombstone
180    pub fn contains_range_tombstone(&self) -> bool {
181        self.input_ssts
182            .iter()
183            .flat_map(|level| level.table_infos.iter())
184            .any(|sst| sst.range_tombstone_count > 0)
185    }
186
187    // The compact task may need to reclaim key with split sst
188    pub fn contains_split_sst(&self) -> bool {
189        self.input_ssts
190            .iter()
191            .flat_map(|level| level.table_infos.iter())
192            .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
193    }
194
195    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
196        self.input_ssts
197            .iter()
198            .flat_map(|level| level.table_infos.iter())
199            .flat_map(|sst| sst.table_ids.clone())
200            .sorted()
201            .dedup()
202    }
203
204    // filter the table-id that in existing_table_ids with the table-id in compact-task
205    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
206        let existing_table_ids: HashSet<TableId> =
207            HashSet::from_iter(self.existing_table_ids.clone());
208        self.get_table_ids_from_input_ssts()
209            .filter(|table_id| existing_table_ids.contains(table_id))
210            .collect()
211    }
212
213    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
214        is_compaction_task_expired(
215            self.compaction_group_version_id,
216            compaction_group_version_id_expected,
217        )
218    }
219}
220
221pub fn is_compaction_task_expired(
222    compaction_group_version_id_in_task: u64,
223    compaction_group_version_id_expected: u64,
224) -> bool {
225    compaction_group_version_id_in_task != compaction_group_version_id_expected
226}
227
228impl From<PbCompactTask> for CompactTask {
229    fn from(pb_compact_task: PbCompactTask) -> Self {
230        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
231            .table_watermarks
232            .into_iter()
233            .map(|(table_id, pb_table_watermark)| {
234                let table_watermark = TableWatermarks::from(pb_table_watermark);
235                (table_id, table_watermark)
236            })
237            .partition(|(_table_id, table_watermarke)| {
238                matches!(
239                    table_watermarke.watermark_type,
240                    WatermarkSerdeType::PkPrefix
241                )
242            });
243
244        #[expect(deprecated)]
245        Self {
246            input_ssts: pb_compact_task
247                .input_ssts
248                .into_iter()
249                .map(InputLevel::from)
250                .collect_vec(),
251            splits: pb_compact_task
252                .splits
253                .into_iter()
254                .map(|pb_keyrange| KeyRange {
255                    left: pb_keyrange.left.into(),
256                    right: pb_keyrange.right.into(),
257                    right_exclusive: pb_keyrange.right_exclusive,
258                })
259                .collect_vec(),
260            sorted_output_ssts: pb_compact_task
261                .sorted_output_ssts
262                .into_iter()
263                .map(SstableInfo::from)
264                .collect_vec(),
265            task_id: pb_compact_task.task_id,
266            target_level: pb_compact_task.target_level,
267            gc_delete_keys: pb_compact_task.gc_delete_keys,
268            base_level: pb_compact_task.base_level,
269            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
270            compaction_group_id: pb_compact_task.compaction_group_id,
271            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
272            compression_algorithm: pb_compact_task.compression_algorithm,
273            target_file_size: pb_compact_task.target_file_size,
274            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
275            table_options: pb_compact_task
276                .table_options
277                .iter()
278                .map(|(table_id, v)| (*table_id, *v))
279                .collect(),
280            current_epoch_time: pb_compact_task.current_epoch_time,
281            target_sub_level_id: pb_compact_task.target_sub_level_id,
282            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
283            split_by_state_table: pb_compact_task.split_by_state_table,
284            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
285            table_vnode_partition: pb_compact_task
286                .table_vnode_partition
287                .iter()
288                .map(|(table_id, v)| (*table_id, *v))
289                .collect(),
290            pk_prefix_table_watermarks,
291            non_pk_prefix_table_watermarks,
292            table_schemas: pb_compact_task
293                .table_schemas
294                .iter()
295                .map(|(table_id, v)| (*table_id, v.clone()))
296                .collect(),
297            max_sub_compaction: pb_compact_task.max_sub_compaction,
298            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
299        }
300    }
301}
302
303impl From<&PbCompactTask> for CompactTask {
304    fn from(pb_compact_task: &PbCompactTask) -> Self {
305        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
306            .table_watermarks
307            .iter()
308            .map(|(table_id, pb_table_watermark)| {
309                let table_watermark = TableWatermarks::from(pb_table_watermark);
310                (*table_id, table_watermark)
311            })
312            .partition(|(_table_id, table_watermarke)| {
313                matches!(
314                    table_watermarke.watermark_type,
315                    WatermarkSerdeType::PkPrefix
316                )
317            });
318
319        #[expect(deprecated)]
320        Self {
321            input_ssts: pb_compact_task
322                .input_ssts
323                .iter()
324                .map(InputLevel::from)
325                .collect_vec(),
326            splits: pb_compact_task
327                .splits
328                .iter()
329                .map(|pb_keyrange| KeyRange {
330                    left: pb_keyrange.left.clone().into(),
331                    right: pb_keyrange.right.clone().into(),
332                    right_exclusive: pb_keyrange.right_exclusive,
333                })
334                .collect_vec(),
335            sorted_output_ssts: pb_compact_task
336                .sorted_output_ssts
337                .iter()
338                .map(SstableInfo::from)
339                .collect_vec(),
340            task_id: pb_compact_task.task_id,
341            target_level: pb_compact_task.target_level,
342            gc_delete_keys: pb_compact_task.gc_delete_keys,
343            base_level: pb_compact_task.base_level,
344            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
345            compaction_group_id: pb_compact_task.compaction_group_id,
346            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
347            compression_algorithm: pb_compact_task.compression_algorithm,
348            target_file_size: pb_compact_task.target_file_size,
349            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
350            table_options: pb_compact_task
351                .table_options
352                .iter()
353                .map(|(table_id, v)| (*table_id, *v))
354                .collect(),
355            current_epoch_time: pb_compact_task.current_epoch_time,
356            target_sub_level_id: pb_compact_task.target_sub_level_id,
357            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
358            split_by_state_table: pb_compact_task.split_by_state_table,
359            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
360            table_vnode_partition: pb_compact_task
361                .table_vnode_partition
362                .iter()
363                .map(|(table_id, v)| (*table_id, *v))
364                .collect(),
365            pk_prefix_table_watermarks,
366            non_pk_prefix_table_watermarks,
367            table_schemas: pb_compact_task
368                .table_schemas
369                .iter()
370                .map(|(table_id, v)| (*table_id, v.clone()))
371                .collect(),
372            max_sub_compaction: pb_compact_task.max_sub_compaction,
373            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
374        }
375    }
376}
377
378impl From<CompactTask> for PbCompactTask {
379    fn from(compact_task: CompactTask) -> Self {
380        #[expect(deprecated)]
381        Self {
382            input_ssts: compact_task
383                .input_ssts
384                .into_iter()
385                .map(|input_level| input_level.into())
386                .collect_vec(),
387            splits: compact_task
388                .splits
389                .into_iter()
390                .map(|keyrange| PbKeyRange {
391                    left: keyrange.left.into(),
392                    right: keyrange.right.into(),
393                    right_exclusive: keyrange.right_exclusive,
394                })
395                .collect_vec(),
396            sorted_output_ssts: compact_task
397                .sorted_output_ssts
398                .into_iter()
399                .map(|sst| sst.into())
400                .collect_vec(),
401            task_id: compact_task.task_id,
402            target_level: compact_task.target_level,
403            gc_delete_keys: compact_task.gc_delete_keys,
404            base_level: compact_task.base_level,
405            task_status: compact_task.task_status.into(),
406            compaction_group_id: compact_task.compaction_group_id,
407            existing_table_ids: compact_task.existing_table_ids.clone(),
408            compression_algorithm: compact_task.compression_algorithm,
409            target_file_size: compact_task.target_file_size,
410            compaction_filter_mask: compact_task.compaction_filter_mask,
411            table_options: compact_task.table_options.clone(),
412            current_epoch_time: compact_task.current_epoch_time,
413            target_sub_level_id: compact_task.target_sub_level_id,
414            task_type: compact_task.task_type.into(),
415            split_weight_by_vnode: compact_task.split_weight_by_vnode,
416            table_vnode_partition: compact_task.table_vnode_partition.clone(),
417            table_watermarks: compact_task
418                .pk_prefix_table_watermarks
419                .into_iter()
420                .chain(compact_task.non_pk_prefix_table_watermarks)
421                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
422                .collect(),
423            split_by_state_table: compact_task.split_by_state_table,
424            table_schemas: compact_task.table_schemas.clone(),
425            max_sub_compaction: compact_task.max_sub_compaction,
426            compaction_group_version_id: compact_task.compaction_group_version_id,
427        }
428    }
429}
430
431impl From<&CompactTask> for PbCompactTask {
432    fn from(compact_task: &CompactTask) -> Self {
433        #[expect(deprecated)]
434        Self {
435            input_ssts: compact_task
436                .input_ssts
437                .iter()
438                .map(|input_level| input_level.into())
439                .collect_vec(),
440            splits: compact_task
441                .splits
442                .iter()
443                .map(|keyrange| PbKeyRange {
444                    left: keyrange.left.to_vec(),
445                    right: keyrange.right.to_vec(),
446                    right_exclusive: keyrange.right_exclusive,
447                })
448                .collect_vec(),
449            sorted_output_ssts: compact_task
450                .sorted_output_ssts
451                .iter()
452                .map(|sst| sst.into())
453                .collect_vec(),
454            task_id: compact_task.task_id,
455            target_level: compact_task.target_level,
456            gc_delete_keys: compact_task.gc_delete_keys,
457            base_level: compact_task.base_level,
458            task_status: compact_task.task_status.into(),
459            compaction_group_id: compact_task.compaction_group_id,
460            existing_table_ids: compact_task.existing_table_ids.clone(),
461            compression_algorithm: compact_task.compression_algorithm,
462            target_file_size: compact_task.target_file_size,
463            compaction_filter_mask: compact_task.compaction_filter_mask,
464            table_options: compact_task.table_options.clone(),
465            current_epoch_time: compact_task.current_epoch_time,
466            target_sub_level_id: compact_task.target_sub_level_id,
467            task_type: compact_task.task_type.into(),
468            split_weight_by_vnode: compact_task.split_weight_by_vnode,
469            table_vnode_partition: compact_task.table_vnode_partition.clone(),
470            table_watermarks: compact_task
471                .pk_prefix_table_watermarks
472                .iter()
473                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
474                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
475                .collect(),
476            split_by_state_table: compact_task.split_by_state_table,
477            table_schemas: compact_task.table_schemas.clone(),
478            max_sub_compaction: compact_task.max_sub_compaction,
479            compaction_group_version_id: compact_task.compaction_group_version_id,
480        }
481    }
482}
483
484#[derive(Clone, PartialEq, Default)]
485pub struct ValidationTask {
486    pub sst_infos: Vec<SstableInfo>,
487    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
488}
489
490impl From<PbValidationTask> for ValidationTask {
491    fn from(pb_validation_task: PbValidationTask) -> Self {
492        Self {
493            sst_infos: pb_validation_task
494                .sst_infos
495                .into_iter()
496                .map(SstableInfo::from)
497                .collect_vec(),
498            sst_id_to_worker_id: pb_validation_task
499                .sst_id_to_worker_id
500                .into_iter()
501                .map(|(object_id, worker_id)| (object_id.into(), worker_id))
502                .collect(),
503        }
504    }
505}
506
507impl From<ValidationTask> for PbValidationTask {
508    fn from(validation_task: ValidationTask) -> Self {
509        Self {
510            sst_infos: validation_task
511                .sst_infos
512                .into_iter()
513                .map(|sst| sst.into())
514                .collect_vec(),
515            sst_id_to_worker_id: validation_task
516                .sst_id_to_worker_id
517                .into_iter()
518                .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
519                .collect(),
520        }
521    }
522}
523
524impl ValidationTask {
525    pub fn estimated_encode_len(&self) -> usize {
526        self.sst_infos
527            .iter()
528            .map(|sst| sst.estimated_encode_len())
529            .sum::<usize>()
530            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
531            + size_of::<u64>()
532    }
533}
534
535#[derive(Clone, PartialEq, Default, Debug)]
536pub struct ReportTask {
537    pub table_stats_change: HashMap<TableId, PbTableStats>,
538    pub task_id: u64,
539    pub task_status: TaskStatus,
540    pub sorted_output_ssts: Vec<SstableInfo>,
541    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
542}
543
544impl From<PbReportTask> for ReportTask {
545    fn from(value: PbReportTask) -> Self {
546        Self {
547            table_stats_change: value.table_stats_change.clone(),
548            task_id: value.task_id,
549            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
550            sorted_output_ssts: value
551                .sorted_output_ssts
552                .into_iter()
553                .map(SstableInfo::from)
554                .collect_vec(),
555            object_timestamps: value
556                .object_timestamps
557                .into_iter()
558                .map(|(object_id, timestamp)| (object_id.into(), timestamp))
559                .collect(),
560        }
561    }
562}
563
564impl From<ReportTask> for PbReportTask {
565    fn from(value: ReportTask) -> Self {
566        Self {
567            table_stats_change: value.table_stats_change.clone(),
568            task_id: value.task_id,
569            task_status: value.task_status.into(),
570            sorted_output_ssts: value
571                .sorted_output_ssts
572                .into_iter()
573                .map(|sst| sst.into())
574                .collect_vec(),
575            object_timestamps: value
576                .object_timestamps
577                .into_iter()
578                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
579                .collect(),
580        }
581    }
582}