risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36    /// SSTs to be compacted, which will be removed from LSM after compaction
37    pub input_ssts: Vec<InputLevel>,
38    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
39    /// corresponding to that in `splits`, respectively
40    pub splits: Vec<KeyRange>,
41    /// compaction output, which will be added to `target_level` of LSM after compaction
42    pub sorted_output_ssts: Vec<SstableInfo>,
43    /// task id assigned by hummock storage service
44    pub task_id: u64,
45    /// compaction output will be added to `target_level` of LSM after compaction
46    pub target_level: u32,
47    pub gc_delete_keys: bool,
48    /// Lbase in LSM
49    pub base_level: u32,
50    pub task_status: PbTaskStatus,
51    /// compaction group the task belongs to.
52    pub compaction_group_id: u64,
53    /// compaction group id when the compaction task is created
54    pub compaction_group_version_id: u64,
55    /// `existing_table_ids` for compaction drop key
56    pub existing_table_ids: Vec<TableId>,
57    pub compression_algorithm: u32,
58    pub target_file_size: u64,
59    pub compaction_filter_mask: u32,
60    pub table_options: BTreeMap<TableId, PbTableOption>,
61    pub current_epoch_time: u64,
62    pub target_sub_level_id: u64,
63    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
64    pub task_type: PbTaskType,
65    /// Deprecated. use `table_vnode_partition` instead;
66    pub split_by_state_table: bool,
67    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
68    /// Deprecated. use `table_vnode_partition` instead;
69    pub split_weight_by_vnode: u32,
70    pub table_vnode_partition: BTreeMap<TableId, u32>,
71    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
72    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
73    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76    pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77
78    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
79
80    pub max_sub_compaction: u32,
81
82    pub max_kv_count_for_xor16: Option<u64>,
83}
84
85impl CompactTask {
86    pub fn estimated_encode_len(&self) -> usize {
87        self.input_ssts
88            .iter()
89            .map(|input_level| input_level.estimated_encode_len())
90            .sum::<usize>()
91            + self
92                .splits
93                .iter()
94                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
95                .sum::<usize>()
96            + size_of::<u64>()
97            + self
98                .sorted_output_ssts
99                .iter()
100                .map(|sst| sst.estimated_encode_len())
101                .sum::<usize>()
102            + size_of::<u64>()
103            + size_of::<u32>()
104            + size_of::<bool>()
105            + size_of::<u32>()
106            + size_of::<i32>()
107            + size_of::<u64>()
108            + self.existing_table_ids.len() * size_of::<u32>()
109            + size_of::<u32>()
110            + size_of::<u64>()
111            + size_of::<u32>()
112            + self.table_options.len() * size_of::<u64>()
113            + size_of::<u64>()
114            + size_of::<u64>()
115            + size_of::<i32>()
116            + size_of::<bool>()
117            + size_of::<u32>()
118            + self.table_vnode_partition.len() * size_of::<u64>()
119            + self
120                .pk_prefix_table_watermarks
121                .values()
122                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
123                .sum::<usize>()
124            + self
125                .non_pk_prefix_table_watermarks
126                .values()
127                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
128                .sum::<usize>()
129            + self
130                .value_table_watermarks
131                .values()
132                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
133                .sum::<usize>()
134    }
135
136    pub fn is_trivial_move_task(&self) -> bool {
137        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
138            return false;
139        }
140
141        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
142        {
143            return false;
144        }
145
146        // it may be a manual compaction task
147        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
148            && self.input_ssts[0].level_idx > 0
149        {
150            return false;
151        }
152
153        if self.input_ssts[1].level_idx == self.target_level
154            && self.input_ssts[1].table_infos.is_empty()
155        {
156            return true;
157        }
158
159        false
160    }
161
162    pub fn is_trivial_reclaim(&self) -> bool {
163        // Currently all VnodeWatermark tasks are trivial reclaim.
164        if self.task_type == TaskType::VnodeWatermark {
165            return true;
166        }
167        let exist_table_ids =
168            HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
169        self.input_ssts.iter().all(|level| {
170            level.table_infos.iter().all(|sst| {
171                sst.table_ids
172                    .iter()
173                    .all(|table_id| !exist_table_ids.contains(table_id))
174            })
175        })
176    }
177}
178
179impl CompactTask {
180    // The compact task may need to reclaim key with TTL
181    pub fn contains_ttl(&self) -> bool {
182        self.table_options
183            .iter()
184            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
185    }
186
187    // The compact task may need to reclaim key with range tombstone
188    pub fn contains_range_tombstone(&self) -> bool {
189        self.input_ssts
190            .iter()
191            .flat_map(|level| level.table_infos.iter())
192            .any(|sst| sst.range_tombstone_count > 0)
193    }
194
195    // The compact task may need to reclaim key with split sst
196    pub fn contains_split_sst(&self) -> bool {
197        self.input_ssts
198            .iter()
199            .flat_map(|level| level.table_infos.iter())
200            .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
201    }
202
203    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
204        self.input_ssts
205            .iter()
206            .flat_map(|level| level.table_infos.iter())
207            .flat_map(|sst| sst.table_ids.clone())
208            .sorted()
209            .dedup()
210    }
211
212    // filter the table-id that in existing_table_ids with the table-id in compact-task
213    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
214        let existing_table_ids: HashSet<TableId> =
215            HashSet::from_iter(self.existing_table_ids.clone());
216        self.get_table_ids_from_input_ssts()
217            .filter(|table_id| existing_table_ids.contains(table_id))
218            .collect()
219    }
220
221    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
222        is_compaction_task_expired(
223            self.compaction_group_version_id,
224            compaction_group_version_id_expected,
225        )
226    }
227
228    /// Determines whether to use block-based filter for this compaction task.
229    /// Returns true if the total key count exceeds the configured threshold.
230    pub fn should_use_block_based_filter(&self) -> bool {
231        let kv_count = self
232            .input_ssts
233            .iter()
234            .flat_map(|level| level.table_infos.iter())
235            .map(|sst| sst.total_key_count)
236            .sum::<u64>();
237
238        crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
239    }
240}
241
242fn split_watermark_serde_types(
243    pb_compact_task: &PbCompactTask,
244) -> (
245    BTreeMap<TableId, TableWatermarks>,
246    BTreeMap<TableId, TableWatermarks>,
247    BTreeMap<TableId, TableWatermarks>,
248) {
249    let mut pk_prefix_table_watermarks = BTreeMap::default();
250    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
251    let mut value_table_watermarks = BTreeMap::default();
252    for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
253        let watermark = TableWatermarks::from(pbwatermark);
254        match watermark.watermark_type {
255            WatermarkSerdeType::PkPrefix => {
256                pk_prefix_table_watermarks.insert(*table_id, watermark);
257            }
258            WatermarkSerdeType::NonPkPrefix => {
259                non_pk_prefix_table_watermarks.insert(*table_id, watermark);
260            }
261            WatermarkSerdeType::Value => {
262                value_table_watermarks.insert(*table_id, watermark);
263            }
264        }
265    }
266    (
267        pk_prefix_table_watermarks,
268        non_pk_prefix_table_watermarks,
269        value_table_watermarks,
270    )
271}
272
273pub fn is_compaction_task_expired(
274    compaction_group_version_id_in_task: u64,
275    compaction_group_version_id_expected: u64,
276) -> bool {
277    compaction_group_version_id_in_task != compaction_group_version_id_expected
278}
279
280impl From<PbCompactTask> for CompactTask {
281    fn from(pb_compact_task: PbCompactTask) -> Self {
282        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
283            split_watermark_serde_types(&pb_compact_task);
284        #[expect(deprecated)]
285        Self {
286            input_ssts: pb_compact_task
287                .input_ssts
288                .into_iter()
289                .map(InputLevel::from)
290                .collect_vec(),
291            splits: pb_compact_task
292                .splits
293                .into_iter()
294                .map(|pb_keyrange| KeyRange {
295                    left: pb_keyrange.left.into(),
296                    right: pb_keyrange.right.into(),
297                    right_exclusive: pb_keyrange.right_exclusive,
298                })
299                .collect_vec(),
300            sorted_output_ssts: pb_compact_task
301                .sorted_output_ssts
302                .into_iter()
303                .map(SstableInfo::from)
304                .collect_vec(),
305            task_id: pb_compact_task.task_id,
306            target_level: pb_compact_task.target_level,
307            gc_delete_keys: pb_compact_task.gc_delete_keys,
308            base_level: pb_compact_task.base_level,
309            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
310            compaction_group_id: pb_compact_task.compaction_group_id,
311            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
312            compression_algorithm: pb_compact_task.compression_algorithm,
313            target_file_size: pb_compact_task.target_file_size,
314            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
315            table_options: pb_compact_task
316                .table_options
317                .iter()
318                .map(|(table_id, v)| (*table_id, *v))
319                .collect(),
320            current_epoch_time: pb_compact_task.current_epoch_time,
321            target_sub_level_id: pb_compact_task.target_sub_level_id,
322            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
323            split_by_state_table: pb_compact_task.split_by_state_table,
324            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
325            table_vnode_partition: pb_compact_task
326                .table_vnode_partition
327                .iter()
328                .map(|(table_id, v)| (*table_id, *v))
329                .collect(),
330            pk_prefix_table_watermarks,
331            non_pk_prefix_table_watermarks,
332            value_table_watermarks,
333            table_schemas: pb_compact_task
334                .table_schemas
335                .iter()
336                .map(|(table_id, v)| (*table_id, v.clone()))
337                .collect(),
338            max_sub_compaction: pb_compact_task.max_sub_compaction,
339            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
340            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
341        }
342    }
343}
344
345impl From<&PbCompactTask> for CompactTask {
346    fn from(pb_compact_task: &PbCompactTask) -> Self {
347        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
348            split_watermark_serde_types(pb_compact_task);
349        #[expect(deprecated)]
350        Self {
351            input_ssts: pb_compact_task
352                .input_ssts
353                .iter()
354                .map(InputLevel::from)
355                .collect_vec(),
356            splits: pb_compact_task
357                .splits
358                .iter()
359                .map(|pb_keyrange| KeyRange {
360                    left: pb_keyrange.left.clone().into(),
361                    right: pb_keyrange.right.clone().into(),
362                    right_exclusive: pb_keyrange.right_exclusive,
363                })
364                .collect_vec(),
365            sorted_output_ssts: pb_compact_task
366                .sorted_output_ssts
367                .iter()
368                .map(SstableInfo::from)
369                .collect_vec(),
370            task_id: pb_compact_task.task_id,
371            target_level: pb_compact_task.target_level,
372            gc_delete_keys: pb_compact_task.gc_delete_keys,
373            base_level: pb_compact_task.base_level,
374            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
375            compaction_group_id: pb_compact_task.compaction_group_id,
376            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
377            compression_algorithm: pb_compact_task.compression_algorithm,
378            target_file_size: pb_compact_task.target_file_size,
379            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
380            table_options: pb_compact_task
381                .table_options
382                .iter()
383                .map(|(table_id, v)| (*table_id, *v))
384                .collect(),
385            current_epoch_time: pb_compact_task.current_epoch_time,
386            target_sub_level_id: pb_compact_task.target_sub_level_id,
387            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
388            split_by_state_table: pb_compact_task.split_by_state_table,
389            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
390            table_vnode_partition: pb_compact_task
391                .table_vnode_partition
392                .iter()
393                .map(|(table_id, v)| (*table_id, *v))
394                .collect(),
395            pk_prefix_table_watermarks,
396            non_pk_prefix_table_watermarks,
397            value_table_watermarks,
398            table_schemas: pb_compact_task
399                .table_schemas
400                .iter()
401                .map(|(table_id, v)| (*table_id, v.clone()))
402                .collect(),
403            max_sub_compaction: pb_compact_task.max_sub_compaction,
404            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
405            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
406        }
407    }
408}
409
410impl From<CompactTask> for PbCompactTask {
411    fn from(compact_task: CompactTask) -> Self {
412        #[expect(deprecated)]
413        Self {
414            input_ssts: compact_task
415                .input_ssts
416                .into_iter()
417                .map(|input_level| input_level.into())
418                .collect_vec(),
419            splits: compact_task
420                .splits
421                .into_iter()
422                .map(|keyrange| PbKeyRange {
423                    left: keyrange.left.into(),
424                    right: keyrange.right.into(),
425                    right_exclusive: keyrange.right_exclusive,
426                })
427                .collect_vec(),
428            sorted_output_ssts: compact_task
429                .sorted_output_ssts
430                .into_iter()
431                .map(|sst| sst.into())
432                .collect_vec(),
433            task_id: compact_task.task_id,
434            target_level: compact_task.target_level,
435            gc_delete_keys: compact_task.gc_delete_keys,
436            base_level: compact_task.base_level,
437            task_status: compact_task.task_status.into(),
438            compaction_group_id: compact_task.compaction_group_id,
439            existing_table_ids: compact_task.existing_table_ids.clone(),
440            compression_algorithm: compact_task.compression_algorithm,
441            target_file_size: compact_task.target_file_size,
442            compaction_filter_mask: compact_task.compaction_filter_mask,
443            table_options: compact_task.table_options.clone(),
444            current_epoch_time: compact_task.current_epoch_time,
445            target_sub_level_id: compact_task.target_sub_level_id,
446            task_type: compact_task.task_type.into(),
447            split_weight_by_vnode: compact_task.split_weight_by_vnode,
448            table_vnode_partition: compact_task.table_vnode_partition.clone(),
449            table_watermarks: compact_task
450                .pk_prefix_table_watermarks
451                .into_iter()
452                .chain(compact_task.non_pk_prefix_table_watermarks)
453                .chain(compact_task.value_table_watermarks)
454                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
455                .collect(),
456            split_by_state_table: compact_task.split_by_state_table,
457            table_schemas: compact_task.table_schemas.clone(),
458            max_sub_compaction: compact_task.max_sub_compaction,
459            compaction_group_version_id: compact_task.compaction_group_version_id,
460            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
461        }
462    }
463}
464
465impl From<&CompactTask> for PbCompactTask {
466    fn from(compact_task: &CompactTask) -> Self {
467        #[expect(deprecated)]
468        Self {
469            input_ssts: compact_task
470                .input_ssts
471                .iter()
472                .map(|input_level| input_level.into())
473                .collect_vec(),
474            splits: compact_task
475                .splits
476                .iter()
477                .map(|keyrange| PbKeyRange {
478                    left: keyrange.left.to_vec(),
479                    right: keyrange.right.to_vec(),
480                    right_exclusive: keyrange.right_exclusive,
481                })
482                .collect_vec(),
483            sorted_output_ssts: compact_task
484                .sorted_output_ssts
485                .iter()
486                .map(|sst| sst.into())
487                .collect_vec(),
488            task_id: compact_task.task_id,
489            target_level: compact_task.target_level,
490            gc_delete_keys: compact_task.gc_delete_keys,
491            base_level: compact_task.base_level,
492            task_status: compact_task.task_status.into(),
493            compaction_group_id: compact_task.compaction_group_id,
494            existing_table_ids: compact_task.existing_table_ids.clone(),
495            compression_algorithm: compact_task.compression_algorithm,
496            target_file_size: compact_task.target_file_size,
497            compaction_filter_mask: compact_task.compaction_filter_mask,
498            table_options: compact_task.table_options.clone(),
499            current_epoch_time: compact_task.current_epoch_time,
500            target_sub_level_id: compact_task.target_sub_level_id,
501            task_type: compact_task.task_type.into(),
502            split_weight_by_vnode: compact_task.split_weight_by_vnode,
503            table_vnode_partition: compact_task.table_vnode_partition.clone(),
504            table_watermarks: compact_task
505                .pk_prefix_table_watermarks
506                .iter()
507                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
508                .chain(compact_task.value_table_watermarks.iter())
509                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
510                .collect(),
511            split_by_state_table: compact_task.split_by_state_table,
512            table_schemas: compact_task.table_schemas.clone(),
513            max_sub_compaction: compact_task.max_sub_compaction,
514            compaction_group_version_id: compact_task.compaction_group_version_id,
515            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
516        }
517    }
518}
519
520#[derive(Clone, PartialEq, Default)]
521pub struct ValidationTask {
522    pub sst_infos: Vec<SstableInfo>,
523    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
524}
525
526impl From<PbValidationTask> for ValidationTask {
527    fn from(pb_validation_task: PbValidationTask) -> Self {
528        Self {
529            sst_infos: pb_validation_task
530                .sst_infos
531                .into_iter()
532                .map(SstableInfo::from)
533                .collect_vec(),
534            sst_id_to_worker_id: pb_validation_task
535                .sst_id_to_worker_id
536                .into_iter()
537                .map(|(object_id, worker_id)| (object_id.into(), worker_id))
538                .collect(),
539        }
540    }
541}
542
543impl From<ValidationTask> for PbValidationTask {
544    fn from(validation_task: ValidationTask) -> Self {
545        Self {
546            sst_infos: validation_task
547                .sst_infos
548                .into_iter()
549                .map(|sst| sst.into())
550                .collect_vec(),
551            sst_id_to_worker_id: validation_task
552                .sst_id_to_worker_id
553                .into_iter()
554                .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
555                .collect(),
556        }
557    }
558}
559
560impl ValidationTask {
561    pub fn estimated_encode_len(&self) -> usize {
562        self.sst_infos
563            .iter()
564            .map(|sst| sst.estimated_encode_len())
565            .sum::<usize>()
566            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
567            + size_of::<u64>()
568    }
569}
570
571#[derive(Clone, PartialEq, Default, Debug)]
572pub struct ReportTask {
573    pub table_stats_change: HashMap<TableId, PbTableStats>,
574    pub task_id: u64,
575    pub task_status: TaskStatus,
576    pub sorted_output_ssts: Vec<SstableInfo>,
577    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
578}
579
580impl From<PbReportTask> for ReportTask {
581    fn from(value: PbReportTask) -> Self {
582        Self {
583            table_stats_change: value.table_stats_change.clone(),
584            task_id: value.task_id,
585            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
586            sorted_output_ssts: value
587                .sorted_output_ssts
588                .into_iter()
589                .map(SstableInfo::from)
590                .collect_vec(),
591            object_timestamps: value
592                .object_timestamps
593                .into_iter()
594                .map(|(object_id, timestamp)| (object_id.into(), timestamp))
595                .collect(),
596        }
597    }
598}
599
600impl From<ReportTask> for PbReportTask {
601    fn from(value: ReportTask) -> Self {
602        Self {
603            table_stats_change: value.table_stats_change.clone(),
604            task_id: value.task_id,
605            task_status: value.task_status.into(),
606            sorted_output_ssts: value
607                .sorted_output_ssts
608                .into_iter()
609                .map(|sst| sst.into())
610                .collect_vec(),
611            object_timestamps: value
612                .object_timestamps
613                .into_iter()
614                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
615                .collect(),
616        }
617    }
618}