risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37    /// SSTs to be compacted, which will be removed from LSM after compaction
38    pub input_ssts: Vec<InputLevel>,
39    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
40    /// corresponding to that in `splits`, respectively
41    pub splits: Vec<KeyRange>,
42    /// compaction output, which will be added to `target_level` of LSM after compaction
43    pub sorted_output_ssts: Vec<SstableInfo>,
44    /// task id assigned by hummock storage service
45    pub task_id: u64,
46    /// compaction output will be added to `target_level` of LSM after compaction
47    pub target_level: u32,
48    pub gc_delete_keys: bool,
49    /// Lbase in LSM
50    pub base_level: u32,
51    pub task_status: PbTaskStatus,
52    /// compaction group the task belongs to.
53    pub compaction_group_id: CompactionGroupId,
54    /// compaction group id when the compaction task is created
55    pub compaction_group_version_id: u64,
56    /// `existing_table_ids` for compaction drop key
57    pub existing_table_ids: Vec<TableId>,
58    pub compression_algorithm: u32,
59    pub target_file_size: u64,
60    pub compaction_filter_mask: u32,
61    pub table_options: BTreeMap<TableId, PbTableOption>,
62    pub current_epoch_time: u64,
63    pub target_sub_level_id: u64,
64    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
65    pub task_type: PbTaskType,
66    /// Deprecated. use `table_vnode_partition` instead;
67    pub split_by_state_table: bool,
68    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
69    /// Deprecated. use `table_vnode_partition` instead;
70    pub split_weight_by_vnode: u32,
71    pub table_vnode_partition: BTreeMap<TableId, u32>,
72    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
73    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
74    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
75
76    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77    pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
78
79    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
80
81    pub max_sub_compaction: u32,
82
83    pub max_kv_count_for_xor16: Option<u64>,
84}
85
86impl CompactTask {
87    pub fn estimated_encode_len(&self) -> usize {
88        self.input_ssts
89            .iter()
90            .map(|input_level| input_level.estimated_encode_len())
91            .sum::<usize>()
92            + self
93                .splits
94                .iter()
95                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
96                .sum::<usize>()
97            + size_of::<u64>()
98            + self
99                .sorted_output_ssts
100                .iter()
101                .map(|sst| sst.estimated_encode_len())
102                .sum::<usize>()
103            + size_of::<u64>()
104            + size_of::<u32>()
105            + size_of::<bool>()
106            + size_of::<u32>()
107            + size_of::<i32>()
108            + size_of::<u64>()
109            + self.existing_table_ids.len() * size_of::<u32>()
110            + size_of::<u32>()
111            + size_of::<u64>()
112            + size_of::<u32>()
113            + self.table_options.len() * size_of::<u64>()
114            + size_of::<u64>()
115            + size_of::<u64>()
116            + size_of::<i32>()
117            + size_of::<bool>()
118            + size_of::<u32>()
119            + self.table_vnode_partition.len() * size_of::<u64>()
120            + self
121                .pk_prefix_table_watermarks
122                .values()
123                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
124                .sum::<usize>()
125            + self
126                .non_pk_prefix_table_watermarks
127                .values()
128                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
129                .sum::<usize>()
130            + self
131                .value_table_watermarks
132                .values()
133                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
134                .sum::<usize>()
135    }
136
137    pub fn is_trivial_move_task(&self) -> bool {
138        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
139            return false;
140        }
141
142        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
143        {
144            return false;
145        }
146
147        // it may be a manual compaction task
148        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
149            && self.input_ssts[0].level_idx > 0
150        {
151            return false;
152        }
153
154        if self.input_ssts[1].level_idx == self.target_level
155            && self.input_ssts[1].table_infos.is_empty()
156        {
157            return true;
158        }
159
160        false
161    }
162
163    pub fn is_trivial_reclaim(&self) -> bool {
164        // Currently all VnodeWatermark tasks are trivial reclaim.
165        if self.task_type == TaskType::VnodeWatermark {
166            return true;
167        }
168        let exist_table_ids =
169            HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
170        self.input_ssts.iter().all(|level| {
171            level.table_infos.iter().all(|sst| {
172                sst.table_ids
173                    .iter()
174                    .all(|table_id| !exist_table_ids.contains(table_id))
175            })
176        })
177    }
178}
179
180impl CompactTask {
181    // The compact task may need to reclaim key with TTL
182    pub fn contains_ttl(&self) -> bool {
183        self.table_options
184            .iter()
185            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
186    }
187
188    // The compact task may need to reclaim key with range tombstone
189    pub fn contains_range_tombstone(&self) -> bool {
190        self.input_ssts
191            .iter()
192            .flat_map(|level| level.table_infos.iter())
193            .any(|sst| sst.range_tombstone_count > 0)
194    }
195
196    // The compact task may need to reclaim key with split sst
197    pub fn contains_split_sst(&self) -> bool {
198        self.input_ssts
199            .iter()
200            .flat_map(|level| level.table_infos.iter())
201            .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
202    }
203
204    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
205        self.input_ssts
206            .iter()
207            .flat_map(|level| level.table_infos.iter())
208            .flat_map(|sst| sst.table_ids.clone())
209            .sorted()
210            .dedup()
211    }
212
213    // filter the table-id that in existing_table_ids with the table-id in compact-task
214    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
215        let existing_table_ids: HashSet<TableId> =
216            HashSet::from_iter(self.existing_table_ids.clone());
217        self.get_table_ids_from_input_ssts()
218            .filter(|table_id| existing_table_ids.contains(table_id))
219            .collect()
220    }
221
222    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
223        is_compaction_task_expired(
224            self.compaction_group_version_id,
225            compaction_group_version_id_expected,
226        )
227    }
228
229    /// Determines whether to use block-based filter for this compaction task.
230    /// Returns true if the total key count exceeds the configured threshold.
231    pub fn should_use_block_based_filter(&self) -> bool {
232        let kv_count = self
233            .input_ssts
234            .iter()
235            .flat_map(|level| level.table_infos.iter())
236            .map(|sst| sst.total_key_count)
237            .sum::<u64>();
238
239        crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
240    }
241}
242
243fn split_watermark_serde_types(
244    pb_compact_task: &PbCompactTask,
245) -> (
246    BTreeMap<TableId, TableWatermarks>,
247    BTreeMap<TableId, TableWatermarks>,
248    BTreeMap<TableId, TableWatermarks>,
249) {
250    let mut pk_prefix_table_watermarks = BTreeMap::default();
251    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
252    let mut value_table_watermarks = BTreeMap::default();
253    for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
254        let watermark = TableWatermarks::from(pbwatermark);
255        match watermark.watermark_type {
256            WatermarkSerdeType::PkPrefix => {
257                pk_prefix_table_watermarks.insert(*table_id, watermark);
258            }
259            WatermarkSerdeType::NonPkPrefix => {
260                non_pk_prefix_table_watermarks.insert(*table_id, watermark);
261            }
262            WatermarkSerdeType::Value => {
263                value_table_watermarks.insert(*table_id, watermark);
264            }
265        }
266    }
267    (
268        pk_prefix_table_watermarks,
269        non_pk_prefix_table_watermarks,
270        value_table_watermarks,
271    )
272}
273
274pub fn is_compaction_task_expired(
275    compaction_group_version_id_in_task: u64,
276    compaction_group_version_id_expected: u64,
277) -> bool {
278    compaction_group_version_id_in_task != compaction_group_version_id_expected
279}
280
281impl From<PbCompactTask> for CompactTask {
282    fn from(pb_compact_task: PbCompactTask) -> Self {
283        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
284            split_watermark_serde_types(&pb_compact_task);
285        #[expect(deprecated)]
286        Self {
287            input_ssts: pb_compact_task
288                .input_ssts
289                .into_iter()
290                .map(InputLevel::from)
291                .collect_vec(),
292            splits: pb_compact_task
293                .splits
294                .into_iter()
295                .map(|pb_keyrange| KeyRange {
296                    left: pb_keyrange.left.into(),
297                    right: pb_keyrange.right.into(),
298                    right_exclusive: pb_keyrange.right_exclusive,
299                })
300                .collect_vec(),
301            sorted_output_ssts: pb_compact_task
302                .sorted_output_ssts
303                .into_iter()
304                .map(SstableInfo::from)
305                .collect_vec(),
306            task_id: pb_compact_task.task_id,
307            target_level: pb_compact_task.target_level,
308            gc_delete_keys: pb_compact_task.gc_delete_keys,
309            base_level: pb_compact_task.base_level,
310            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
311            compaction_group_id: pb_compact_task.compaction_group_id,
312            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
313            compression_algorithm: pb_compact_task.compression_algorithm,
314            target_file_size: pb_compact_task.target_file_size,
315            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
316            table_options: pb_compact_task
317                .table_options
318                .iter()
319                .map(|(table_id, v)| (*table_id, *v))
320                .collect(),
321            current_epoch_time: pb_compact_task.current_epoch_time,
322            target_sub_level_id: pb_compact_task.target_sub_level_id,
323            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
324            split_by_state_table: pb_compact_task.split_by_state_table,
325            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
326            table_vnode_partition: pb_compact_task
327                .table_vnode_partition
328                .iter()
329                .map(|(table_id, v)| (*table_id, *v))
330                .collect(),
331            pk_prefix_table_watermarks,
332            non_pk_prefix_table_watermarks,
333            value_table_watermarks,
334            table_schemas: pb_compact_task
335                .table_schemas
336                .iter()
337                .map(|(table_id, v)| (*table_id, v.clone()))
338                .collect(),
339            max_sub_compaction: pb_compact_task.max_sub_compaction,
340            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
341            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
342        }
343    }
344}
345
346impl From<&PbCompactTask> for CompactTask {
347    fn from(pb_compact_task: &PbCompactTask) -> Self {
348        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
349            split_watermark_serde_types(pb_compact_task);
350        #[expect(deprecated)]
351        Self {
352            input_ssts: pb_compact_task
353                .input_ssts
354                .iter()
355                .map(InputLevel::from)
356                .collect_vec(),
357            splits: pb_compact_task
358                .splits
359                .iter()
360                .map(|pb_keyrange| KeyRange {
361                    left: pb_keyrange.left.clone().into(),
362                    right: pb_keyrange.right.clone().into(),
363                    right_exclusive: pb_keyrange.right_exclusive,
364                })
365                .collect_vec(),
366            sorted_output_ssts: pb_compact_task
367                .sorted_output_ssts
368                .iter()
369                .map(SstableInfo::from)
370                .collect_vec(),
371            task_id: pb_compact_task.task_id,
372            target_level: pb_compact_task.target_level,
373            gc_delete_keys: pb_compact_task.gc_delete_keys,
374            base_level: pb_compact_task.base_level,
375            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
376            compaction_group_id: pb_compact_task.compaction_group_id,
377            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
378            compression_algorithm: pb_compact_task.compression_algorithm,
379            target_file_size: pb_compact_task.target_file_size,
380            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
381            table_options: pb_compact_task
382                .table_options
383                .iter()
384                .map(|(table_id, v)| (*table_id, *v))
385                .collect(),
386            current_epoch_time: pb_compact_task.current_epoch_time,
387            target_sub_level_id: pb_compact_task.target_sub_level_id,
388            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
389            split_by_state_table: pb_compact_task.split_by_state_table,
390            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
391            table_vnode_partition: pb_compact_task
392                .table_vnode_partition
393                .iter()
394                .map(|(table_id, v)| (*table_id, *v))
395                .collect(),
396            pk_prefix_table_watermarks,
397            non_pk_prefix_table_watermarks,
398            value_table_watermarks,
399            table_schemas: pb_compact_task
400                .table_schemas
401                .iter()
402                .map(|(table_id, v)| (*table_id, v.clone()))
403                .collect(),
404            max_sub_compaction: pb_compact_task.max_sub_compaction,
405            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
406            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
407        }
408    }
409}
410
411impl From<CompactTask> for PbCompactTask {
412    fn from(compact_task: CompactTask) -> Self {
413        #[expect(deprecated)]
414        Self {
415            input_ssts: compact_task
416                .input_ssts
417                .into_iter()
418                .map(|input_level| input_level.into())
419                .collect_vec(),
420            splits: compact_task
421                .splits
422                .into_iter()
423                .map(|keyrange| PbKeyRange {
424                    left: keyrange.left.into(),
425                    right: keyrange.right.into(),
426                    right_exclusive: keyrange.right_exclusive,
427                })
428                .collect_vec(),
429            sorted_output_ssts: compact_task
430                .sorted_output_ssts
431                .into_iter()
432                .map(|sst| sst.into())
433                .collect_vec(),
434            task_id: compact_task.task_id,
435            target_level: compact_task.target_level,
436            gc_delete_keys: compact_task.gc_delete_keys,
437            base_level: compact_task.base_level,
438            task_status: compact_task.task_status.into(),
439            compaction_group_id: compact_task.compaction_group_id,
440            existing_table_ids: compact_task.existing_table_ids.clone(),
441            compression_algorithm: compact_task.compression_algorithm,
442            target_file_size: compact_task.target_file_size,
443            compaction_filter_mask: compact_task.compaction_filter_mask,
444            table_options: compact_task.table_options.clone(),
445            current_epoch_time: compact_task.current_epoch_time,
446            target_sub_level_id: compact_task.target_sub_level_id,
447            task_type: compact_task.task_type.into(),
448            split_weight_by_vnode: compact_task.split_weight_by_vnode,
449            table_vnode_partition: compact_task.table_vnode_partition.clone(),
450            table_watermarks: compact_task
451                .pk_prefix_table_watermarks
452                .into_iter()
453                .chain(compact_task.non_pk_prefix_table_watermarks)
454                .chain(compact_task.value_table_watermarks)
455                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
456                .collect(),
457            split_by_state_table: compact_task.split_by_state_table,
458            table_schemas: compact_task.table_schemas.clone(),
459            max_sub_compaction: compact_task.max_sub_compaction,
460            compaction_group_version_id: compact_task.compaction_group_version_id,
461            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
462        }
463    }
464}
465
466impl From<&CompactTask> for PbCompactTask {
467    fn from(compact_task: &CompactTask) -> Self {
468        #[expect(deprecated)]
469        Self {
470            input_ssts: compact_task
471                .input_ssts
472                .iter()
473                .map(|input_level| input_level.into())
474                .collect_vec(),
475            splits: compact_task
476                .splits
477                .iter()
478                .map(|keyrange| PbKeyRange {
479                    left: keyrange.left.to_vec(),
480                    right: keyrange.right.to_vec(),
481                    right_exclusive: keyrange.right_exclusive,
482                })
483                .collect_vec(),
484            sorted_output_ssts: compact_task
485                .sorted_output_ssts
486                .iter()
487                .map(|sst| sst.into())
488                .collect_vec(),
489            task_id: compact_task.task_id,
490            target_level: compact_task.target_level,
491            gc_delete_keys: compact_task.gc_delete_keys,
492            base_level: compact_task.base_level,
493            task_status: compact_task.task_status.into(),
494            compaction_group_id: compact_task.compaction_group_id,
495            existing_table_ids: compact_task.existing_table_ids.clone(),
496            compression_algorithm: compact_task.compression_algorithm,
497            target_file_size: compact_task.target_file_size,
498            compaction_filter_mask: compact_task.compaction_filter_mask,
499            table_options: compact_task.table_options.clone(),
500            current_epoch_time: compact_task.current_epoch_time,
501            target_sub_level_id: compact_task.target_sub_level_id,
502            task_type: compact_task.task_type.into(),
503            split_weight_by_vnode: compact_task.split_weight_by_vnode,
504            table_vnode_partition: compact_task.table_vnode_partition.clone(),
505            table_watermarks: compact_task
506                .pk_prefix_table_watermarks
507                .iter()
508                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
509                .chain(compact_task.value_table_watermarks.iter())
510                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
511                .collect(),
512            split_by_state_table: compact_task.split_by_state_table,
513            table_schemas: compact_task.table_schemas.clone(),
514            max_sub_compaction: compact_task.max_sub_compaction,
515            compaction_group_version_id: compact_task.compaction_group_version_id,
516            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
517        }
518    }
519}
520
521#[derive(Clone, PartialEq, Default)]
522pub struct ValidationTask {
523    pub sst_infos: Vec<SstableInfo>,
524    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
525}
526
527impl From<PbValidationTask> for ValidationTask {
528    fn from(pb_validation_task: PbValidationTask) -> Self {
529        Self {
530            sst_infos: pb_validation_task
531                .sst_infos
532                .into_iter()
533                .map(SstableInfo::from)
534                .collect_vec(),
535            sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
536        }
537    }
538}
539
540impl From<ValidationTask> for PbValidationTask {
541    fn from(validation_task: ValidationTask) -> Self {
542        Self {
543            sst_infos: validation_task
544                .sst_infos
545                .into_iter()
546                .map(|sst| sst.into())
547                .collect_vec(),
548            sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
549        }
550    }
551}
552
553impl ValidationTask {
554    pub fn estimated_encode_len(&self) -> usize {
555        self.sst_infos
556            .iter()
557            .map(|sst| sst.estimated_encode_len())
558            .sum::<usize>()
559            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
560            + size_of::<u64>()
561    }
562}
563
564#[derive(Clone, PartialEq, Default, Debug)]
565pub struct ReportTask {
566    pub table_stats_change: HashMap<TableId, PbTableStats>,
567    pub task_id: u64,
568    pub task_status: TaskStatus,
569    pub sorted_output_ssts: Vec<SstableInfo>,
570    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
571}
572
573impl From<PbReportTask> for ReportTask {
574    fn from(value: PbReportTask) -> Self {
575        Self {
576            table_stats_change: value.table_stats_change.clone(),
577            task_id: value.task_id,
578            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
579            sorted_output_ssts: value
580                .sorted_output_ssts
581                .into_iter()
582                .map(SstableInfo::from)
583                .collect_vec(),
584            object_timestamps: value.object_timestamps,
585        }
586    }
587}
588
589impl From<ReportTask> for PbReportTask {
590    fn from(value: ReportTask) -> Self {
591        Self {
592            table_stats_change: value.table_stats_change.clone(),
593            task_id: value.task_id,
594            task_status: value.task_status.into(),
595            sorted_output_ssts: value
596                .sorted_output_ssts
597                .into_iter()
598                .map(|sst| sst.into())
599                .collect_vec(),
600            object_timestamps: value.object_timestamps,
601        }
602    }
603}