risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36    /// SSTs to be compacted, which will be removed from LSM after compaction
37    pub input_ssts: Vec<InputLevel>,
38    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
39    /// corresponding to that in `splits`, respectively
40    pub splits: Vec<KeyRange>,
41    /// compaction output, which will be added to `target_level` of LSM after compaction
42    pub sorted_output_ssts: Vec<SstableInfo>,
43    /// task id assigned by hummock storage service
44    pub task_id: u64,
45    /// compaction output will be added to `target_level` of LSM after compaction
46    pub target_level: u32,
47    pub gc_delete_keys: bool,
48    /// Lbase in LSM
49    pub base_level: u32,
50    pub task_status: PbTaskStatus,
51    /// compaction group the task belongs to.
52    pub compaction_group_id: u64,
53    /// compaction group id when the compaction task is created
54    pub compaction_group_version_id: u64,
55    /// `existing_table_ids` for compaction drop key
56    pub existing_table_ids: Vec<TableId>,
57    pub compression_algorithm: u32,
58    pub target_file_size: u64,
59    pub compaction_filter_mask: u32,
60    pub table_options: BTreeMap<TableId, PbTableOption>,
61    pub current_epoch_time: u64,
62    pub target_sub_level_id: u64,
63    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
64    pub task_type: PbTaskType,
65    /// Deprecated. use `table_vnode_partition` instead;
66    pub split_by_state_table: bool,
67    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
68    /// Deprecated. use `table_vnode_partition` instead;
69    pub split_weight_by_vnode: u32,
70    pub table_vnode_partition: BTreeMap<TableId, u32>,
71    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
72    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
73    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76
77    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
78
79    pub max_sub_compaction: u32,
80
81    pub max_kv_count_for_xor16: Option<u64>,
82}
83
84impl CompactTask {
85    pub fn estimated_encode_len(&self) -> usize {
86        self.input_ssts
87            .iter()
88            .map(|input_level| input_level.estimated_encode_len())
89            .sum::<usize>()
90            + self
91                .splits
92                .iter()
93                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
94                .sum::<usize>()
95            + size_of::<u64>()
96            + self
97                .sorted_output_ssts
98                .iter()
99                .map(|sst| sst.estimated_encode_len())
100                .sum::<usize>()
101            + size_of::<u64>()
102            + size_of::<u32>()
103            + size_of::<bool>()
104            + size_of::<u32>()
105            + size_of::<i32>()
106            + size_of::<u64>()
107            + self.existing_table_ids.len() * size_of::<u32>()
108            + size_of::<u32>()
109            + size_of::<u64>()
110            + size_of::<u32>()
111            + self.table_options.len() * size_of::<u64>()
112            + size_of::<u64>()
113            + size_of::<u64>()
114            + size_of::<i32>()
115            + size_of::<bool>()
116            + size_of::<u32>()
117            + self.table_vnode_partition.len() * size_of::<u64>()
118            + self
119                .pk_prefix_table_watermarks
120                .values()
121                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
122                .sum::<usize>()
123            + self
124                .non_pk_prefix_table_watermarks
125                .values()
126                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
127                .sum::<usize>()
128    }
129
130    pub fn is_trivial_move_task(&self) -> bool {
131        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
132            return false;
133        }
134
135        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
136        {
137            return false;
138        }
139
140        // it may be a manual compaction task
141        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
142            && self.input_ssts[0].level_idx > 0
143        {
144            return false;
145        }
146
147        if self.input_ssts[1].level_idx == self.target_level
148            && self.input_ssts[1].table_infos.is_empty()
149        {
150            return true;
151        }
152
153        false
154    }
155
156    pub fn is_trivial_reclaim(&self) -> bool {
157        // Currently all VnodeWatermark tasks are trivial reclaim.
158        if self.task_type == TaskType::VnodeWatermark {
159            return true;
160        }
161        let exist_table_ids =
162            HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
163        self.input_ssts.iter().all(|level| {
164            level.table_infos.iter().all(|sst| {
165                sst.table_ids
166                    .iter()
167                    .all(|table_id| !exist_table_ids.contains(table_id))
168            })
169        })
170    }
171}
172
173impl CompactTask {
174    // The compact task may need to reclaim key with TTL
175    pub fn contains_ttl(&self) -> bool {
176        self.table_options
177            .iter()
178            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
179    }
180
181    // The compact task may need to reclaim key with range tombstone
182    pub fn contains_range_tombstone(&self) -> bool {
183        self.input_ssts
184            .iter()
185            .flat_map(|level| level.table_infos.iter())
186            .any(|sst| sst.range_tombstone_count > 0)
187    }
188
189    // The compact task may need to reclaim key with split sst
190    pub fn contains_split_sst(&self) -> bool {
191        self.input_ssts
192            .iter()
193            .flat_map(|level| level.table_infos.iter())
194            .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
195    }
196
197    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
198        self.input_ssts
199            .iter()
200            .flat_map(|level| level.table_infos.iter())
201            .flat_map(|sst| sst.table_ids.clone())
202            .sorted()
203            .dedup()
204    }
205
206    // filter the table-id that in existing_table_ids with the table-id in compact-task
207    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
208        let existing_table_ids: HashSet<TableId> =
209            HashSet::from_iter(self.existing_table_ids.clone());
210        self.get_table_ids_from_input_ssts()
211            .filter(|table_id| existing_table_ids.contains(table_id))
212            .collect()
213    }
214
215    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
216        is_compaction_task_expired(
217            self.compaction_group_version_id,
218            compaction_group_version_id_expected,
219        )
220    }
221
222    /// Determines whether to use block-based filter for this compaction task.
223    /// Returns true if the total key count exceeds the configured threshold.
224    pub fn should_use_block_based_filter(&self) -> bool {
225        let kv_count = self
226            .input_ssts
227            .iter()
228            .flat_map(|level| level.table_infos.iter())
229            .map(|sst| sst.total_key_count)
230            .sum::<u64>();
231
232        crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
233    }
234}
235
236pub fn is_compaction_task_expired(
237    compaction_group_version_id_in_task: u64,
238    compaction_group_version_id_expected: u64,
239) -> bool {
240    compaction_group_version_id_in_task != compaction_group_version_id_expected
241}
242
243impl From<PbCompactTask> for CompactTask {
244    fn from(pb_compact_task: PbCompactTask) -> Self {
245        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
246            .table_watermarks
247            .into_iter()
248            .map(|(table_id, pb_table_watermark)| {
249                let table_watermark = TableWatermarks::from(pb_table_watermark);
250                (table_id, table_watermark)
251            })
252            .partition(|(_table_id, table_watermarke)| {
253                matches!(
254                    table_watermarke.watermark_type,
255                    WatermarkSerdeType::PkPrefix
256                )
257            });
258
259        #[expect(deprecated)]
260        Self {
261            input_ssts: pb_compact_task
262                .input_ssts
263                .into_iter()
264                .map(InputLevel::from)
265                .collect_vec(),
266            splits: pb_compact_task
267                .splits
268                .into_iter()
269                .map(|pb_keyrange| KeyRange {
270                    left: pb_keyrange.left.into(),
271                    right: pb_keyrange.right.into(),
272                    right_exclusive: pb_keyrange.right_exclusive,
273                })
274                .collect_vec(),
275            sorted_output_ssts: pb_compact_task
276                .sorted_output_ssts
277                .into_iter()
278                .map(SstableInfo::from)
279                .collect_vec(),
280            task_id: pb_compact_task.task_id,
281            target_level: pb_compact_task.target_level,
282            gc_delete_keys: pb_compact_task.gc_delete_keys,
283            base_level: pb_compact_task.base_level,
284            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
285            compaction_group_id: pb_compact_task.compaction_group_id,
286            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
287            compression_algorithm: pb_compact_task.compression_algorithm,
288            target_file_size: pb_compact_task.target_file_size,
289            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
290            table_options: pb_compact_task
291                .table_options
292                .iter()
293                .map(|(table_id, v)| (*table_id, *v))
294                .collect(),
295            current_epoch_time: pb_compact_task.current_epoch_time,
296            target_sub_level_id: pb_compact_task.target_sub_level_id,
297            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
298            split_by_state_table: pb_compact_task.split_by_state_table,
299            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
300            table_vnode_partition: pb_compact_task
301                .table_vnode_partition
302                .iter()
303                .map(|(table_id, v)| (*table_id, *v))
304                .collect(),
305            pk_prefix_table_watermarks,
306            non_pk_prefix_table_watermarks,
307            table_schemas: pb_compact_task
308                .table_schemas
309                .iter()
310                .map(|(table_id, v)| (*table_id, v.clone()))
311                .collect(),
312            max_sub_compaction: pb_compact_task.max_sub_compaction,
313            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
314            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
315        }
316    }
317}
318
319impl From<&PbCompactTask> for CompactTask {
320    fn from(pb_compact_task: &PbCompactTask) -> Self {
321        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
322            .table_watermarks
323            .iter()
324            .map(|(table_id, pb_table_watermark)| {
325                let table_watermark = TableWatermarks::from(pb_table_watermark);
326                (*table_id, table_watermark)
327            })
328            .partition(|(_table_id, table_watermarke)| {
329                matches!(
330                    table_watermarke.watermark_type,
331                    WatermarkSerdeType::PkPrefix
332                )
333            });
334
335        #[expect(deprecated)]
336        Self {
337            input_ssts: pb_compact_task
338                .input_ssts
339                .iter()
340                .map(InputLevel::from)
341                .collect_vec(),
342            splits: pb_compact_task
343                .splits
344                .iter()
345                .map(|pb_keyrange| KeyRange {
346                    left: pb_keyrange.left.clone().into(),
347                    right: pb_keyrange.right.clone().into(),
348                    right_exclusive: pb_keyrange.right_exclusive,
349                })
350                .collect_vec(),
351            sorted_output_ssts: pb_compact_task
352                .sorted_output_ssts
353                .iter()
354                .map(SstableInfo::from)
355                .collect_vec(),
356            task_id: pb_compact_task.task_id,
357            target_level: pb_compact_task.target_level,
358            gc_delete_keys: pb_compact_task.gc_delete_keys,
359            base_level: pb_compact_task.base_level,
360            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
361            compaction_group_id: pb_compact_task.compaction_group_id,
362            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
363            compression_algorithm: pb_compact_task.compression_algorithm,
364            target_file_size: pb_compact_task.target_file_size,
365            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
366            table_options: pb_compact_task
367                .table_options
368                .iter()
369                .map(|(table_id, v)| (*table_id, *v))
370                .collect(),
371            current_epoch_time: pb_compact_task.current_epoch_time,
372            target_sub_level_id: pb_compact_task.target_sub_level_id,
373            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
374            split_by_state_table: pb_compact_task.split_by_state_table,
375            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
376            table_vnode_partition: pb_compact_task
377                .table_vnode_partition
378                .iter()
379                .map(|(table_id, v)| (*table_id, *v))
380                .collect(),
381            pk_prefix_table_watermarks,
382            non_pk_prefix_table_watermarks,
383            table_schemas: pb_compact_task
384                .table_schemas
385                .iter()
386                .map(|(table_id, v)| (*table_id, v.clone()))
387                .collect(),
388            max_sub_compaction: pb_compact_task.max_sub_compaction,
389            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
390            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
391        }
392    }
393}
394
395impl From<CompactTask> for PbCompactTask {
396    fn from(compact_task: CompactTask) -> Self {
397        #[expect(deprecated)]
398        Self {
399            input_ssts: compact_task
400                .input_ssts
401                .into_iter()
402                .map(|input_level| input_level.into())
403                .collect_vec(),
404            splits: compact_task
405                .splits
406                .into_iter()
407                .map(|keyrange| PbKeyRange {
408                    left: keyrange.left.into(),
409                    right: keyrange.right.into(),
410                    right_exclusive: keyrange.right_exclusive,
411                })
412                .collect_vec(),
413            sorted_output_ssts: compact_task
414                .sorted_output_ssts
415                .into_iter()
416                .map(|sst| sst.into())
417                .collect_vec(),
418            task_id: compact_task.task_id,
419            target_level: compact_task.target_level,
420            gc_delete_keys: compact_task.gc_delete_keys,
421            base_level: compact_task.base_level,
422            task_status: compact_task.task_status.into(),
423            compaction_group_id: compact_task.compaction_group_id,
424            existing_table_ids: compact_task.existing_table_ids.clone(),
425            compression_algorithm: compact_task.compression_algorithm,
426            target_file_size: compact_task.target_file_size,
427            compaction_filter_mask: compact_task.compaction_filter_mask,
428            table_options: compact_task.table_options.clone(),
429            current_epoch_time: compact_task.current_epoch_time,
430            target_sub_level_id: compact_task.target_sub_level_id,
431            task_type: compact_task.task_type.into(),
432            split_weight_by_vnode: compact_task.split_weight_by_vnode,
433            table_vnode_partition: compact_task.table_vnode_partition.clone(),
434            table_watermarks: compact_task
435                .pk_prefix_table_watermarks
436                .into_iter()
437                .chain(compact_task.non_pk_prefix_table_watermarks)
438                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
439                .collect(),
440            split_by_state_table: compact_task.split_by_state_table,
441            table_schemas: compact_task.table_schemas.clone(),
442            max_sub_compaction: compact_task.max_sub_compaction,
443            compaction_group_version_id: compact_task.compaction_group_version_id,
444            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
445        }
446    }
447}
448
449impl From<&CompactTask> for PbCompactTask {
450    fn from(compact_task: &CompactTask) -> Self {
451        #[expect(deprecated)]
452        Self {
453            input_ssts: compact_task
454                .input_ssts
455                .iter()
456                .map(|input_level| input_level.into())
457                .collect_vec(),
458            splits: compact_task
459                .splits
460                .iter()
461                .map(|keyrange| PbKeyRange {
462                    left: keyrange.left.to_vec(),
463                    right: keyrange.right.to_vec(),
464                    right_exclusive: keyrange.right_exclusive,
465                })
466                .collect_vec(),
467            sorted_output_ssts: compact_task
468                .sorted_output_ssts
469                .iter()
470                .map(|sst| sst.into())
471                .collect_vec(),
472            task_id: compact_task.task_id,
473            target_level: compact_task.target_level,
474            gc_delete_keys: compact_task.gc_delete_keys,
475            base_level: compact_task.base_level,
476            task_status: compact_task.task_status.into(),
477            compaction_group_id: compact_task.compaction_group_id,
478            existing_table_ids: compact_task.existing_table_ids.clone(),
479            compression_algorithm: compact_task.compression_algorithm,
480            target_file_size: compact_task.target_file_size,
481            compaction_filter_mask: compact_task.compaction_filter_mask,
482            table_options: compact_task.table_options.clone(),
483            current_epoch_time: compact_task.current_epoch_time,
484            target_sub_level_id: compact_task.target_sub_level_id,
485            task_type: compact_task.task_type.into(),
486            split_weight_by_vnode: compact_task.split_weight_by_vnode,
487            table_vnode_partition: compact_task.table_vnode_partition.clone(),
488            table_watermarks: compact_task
489                .pk_prefix_table_watermarks
490                .iter()
491                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
492                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
493                .collect(),
494            split_by_state_table: compact_task.split_by_state_table,
495            table_schemas: compact_task.table_schemas.clone(),
496            max_sub_compaction: compact_task.max_sub_compaction,
497            compaction_group_version_id: compact_task.compaction_group_version_id,
498            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
499        }
500    }
501}
502
503#[derive(Clone, PartialEq, Default)]
504pub struct ValidationTask {
505    pub sst_infos: Vec<SstableInfo>,
506    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
507}
508
509impl From<PbValidationTask> for ValidationTask {
510    fn from(pb_validation_task: PbValidationTask) -> Self {
511        Self {
512            sst_infos: pb_validation_task
513                .sst_infos
514                .into_iter()
515                .map(SstableInfo::from)
516                .collect_vec(),
517            sst_id_to_worker_id: pb_validation_task
518                .sst_id_to_worker_id
519                .into_iter()
520                .map(|(object_id, worker_id)| (object_id.into(), worker_id))
521                .collect(),
522        }
523    }
524}
525
526impl From<ValidationTask> for PbValidationTask {
527    fn from(validation_task: ValidationTask) -> Self {
528        Self {
529            sst_infos: validation_task
530                .sst_infos
531                .into_iter()
532                .map(|sst| sst.into())
533                .collect_vec(),
534            sst_id_to_worker_id: validation_task
535                .sst_id_to_worker_id
536                .into_iter()
537                .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
538                .collect(),
539        }
540    }
541}
542
543impl ValidationTask {
544    pub fn estimated_encode_len(&self) -> usize {
545        self.sst_infos
546            .iter()
547            .map(|sst| sst.estimated_encode_len())
548            .sum::<usize>()
549            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
550            + size_of::<u64>()
551    }
552}
553
554#[derive(Clone, PartialEq, Default, Debug)]
555pub struct ReportTask {
556    pub table_stats_change: HashMap<TableId, PbTableStats>,
557    pub task_id: u64,
558    pub task_status: TaskStatus,
559    pub sorted_output_ssts: Vec<SstableInfo>,
560    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
561}
562
563impl From<PbReportTask> for ReportTask {
564    fn from(value: PbReportTask) -> Self {
565        Self {
566            table_stats_change: value.table_stats_change.clone(),
567            task_id: value.task_id,
568            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
569            sorted_output_ssts: value
570                .sorted_output_ssts
571                .into_iter()
572                .map(SstableInfo::from)
573                .collect_vec(),
574            object_timestamps: value
575                .object_timestamps
576                .into_iter()
577                .map(|(object_id, timestamp)| (object_id.into(), timestamp))
578                .collect(),
579        }
580    }
581}
582
583impl From<ReportTask> for PbReportTask {
584    fn from(value: ReportTask) -> Self {
585        Self {
586            table_stats_change: value.table_stats_change.clone(),
587            task_id: value.task_id,
588            task_status: value.task_status.into(),
589            sorted_output_ssts: value
590                .sorted_output_ssts
591                .into_iter()
592                .map(|sst| sst.into())
593                .collect_vec(),
594            object_timestamps: value
595                .object_timestamps
596                .into_iter()
597                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
598                .collect(),
599        }
600    }
601}