risingwave_meta/hummock/manager/compaction/
compact_task_builder.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::key_range::KeyRange;
22use risingwave_hummock_sdk::table_watermark::{TableWatermarks, WatermarkSerdeType};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
24use risingwave_pb::hummock::compact_task::TaskStatus;
25use risingwave_pb::hummock::{CompactionConfig, TableOption, TableSchema};
26
27use crate::hummock::compaction::CompactionTask as PickedCompactionTask;
28
29pub(super) struct CompactTaskBuildContext {
30 pub(super) task_id: HummockCompactionTaskId,
31 pub(super) compaction_group_id: CompactionGroupId,
32 pub(super) compaction_group_version_id: u64,
33 pub(super) existing_table_ids: Vec<TableId>,
34 pub(super) table_options: BTreeMap<TableId, TableOption>,
35 pub(super) is_target_level_last: bool,
36 pub(super) compaction_config: Arc<CompactionConfig>,
37 pub(super) current_epoch_time: u64,
38}
39
40pub(super) fn build_base_compact_task(
41 picked_task: PickedCompactionTask,
42 context: CompactTaskBuildContext,
43) -> (CompactTask, Vec<TableId>) {
44 let target_level_id = picked_task.input.target_level as u32;
45 let input_ssts = picked_task.input.input_levels;
46 let compact_table_ids = input_ssts
47 .iter()
48 .flat_map(|level| level.table_infos.iter())
49 .flat_map(|sst| sst.table_ids.iter().copied())
50 .sorted()
51 .dedup()
52 .collect_vec();
53 let compact_table_id_set: HashSet<_> = compact_table_ids.iter().copied().collect();
54 let mut table_options = context.table_options;
55 retain_table_options(&mut table_options, &compact_table_id_set);
56 let compression_algorithm = match picked_task.compression_algorithm.as_str() {
57 "Lz4" => 1,
58 "Zstd" => 2,
59 _ => 0,
60 };
61 let compaction_config = context.compaction_config.as_ref();
62
63 (
64 CompactTask {
65 input_ssts,
66 splits: vec![KeyRange::inf()],
67 sorted_output_ssts: vec![],
68 task_id: context.task_id,
69 target_level: target_level_id,
70 gc_delete_keys: context.is_target_level_last,
72 base_level: picked_task.base_level as u32,
73 task_status: TaskStatus::Pending,
74 compaction_group_id: context.compaction_group_id,
75 compaction_group_version_id: context.compaction_group_version_id,
76 existing_table_ids: context.existing_table_ids,
77 compression_algorithm,
78 target_file_size: picked_task.target_file_size,
79 table_options,
80 current_epoch_time: context.current_epoch_time,
81 compaction_filter_mask: compaction_config.compaction_filter_mask,
82 target_sub_level_id: picked_task.input.target_sub_level_id,
83 task_type: picked_task.compaction_task_type,
84 split_weight_by_vnode: picked_task.input.vnode_partition_count,
85 max_sub_compaction: compaction_config.max_sub_compaction,
86 blocked_xor_filter_kv_count_threshold: compaction_config.max_kv_count_for_xor16,
87 max_vnode_key_range_bytes: compaction_config.max_vnode_key_range_bytes,
88 sstable_filter_kind: picked_task.sstable_filter_kind,
89 sstable_filter_layout: picked_task.sstable_filter_layout,
90 ..Default::default()
91 },
92 compact_table_ids,
93 )
94}
95
96pub(super) fn attach_compact_task_table_metadata(
97 compact_task: &mut CompactTask,
98 compact_table_ids: &[TableId],
99 table_watermarks: BTreeMap<TableId, TableWatermarks>,
100 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
101) {
102 let mut pk_prefix_table_watermarks = BTreeMap::default();
103 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
104 let mut value_table_watermarks = BTreeMap::default();
105 for (table_id, watermark) in table_watermarks {
106 match watermark.watermark_type {
107 WatermarkSerdeType::PkPrefix => {
108 pk_prefix_table_watermarks.insert(table_id, watermark);
109 }
110 WatermarkSerdeType::NonPkPrefix => {
111 non_pk_prefix_table_watermarks.insert(table_id, watermark);
112 }
113 WatermarkSerdeType::Value => {
114 value_table_watermarks.insert(table_id, watermark);
115 }
116 }
117 }
118
119 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
120 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
121 compact_task.value_table_watermarks = value_table_watermarks;
122 compact_task.table_schemas = compact_table_ids
123 .iter()
124 .filter_map(|table_id| {
125 all_versioned_table_schemas.get(table_id).map(|column_ids| {
126 (
127 *table_id,
128 TableSchema {
129 column_ids: column_ids.clone(),
130 },
131 )
132 })
133 })
134 .collect();
135}
136
137fn retain_table_options(
138 table_options: &mut BTreeMap<TableId, TableOption>,
139 table_id_set: &HashSet<TableId>,
140) {
141 table_options.retain(|table_id, _| table_id_set.contains(table_id));
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn test_retain_table_options() {
150 let mut table_options = BTreeMap::from([
151 (TableId::new(1), TableOption::default()),
152 (TableId::new(2), TableOption::default()),
153 (TableId::new(3), TableOption::default()),
154 ]);
155 let compact_table_id_set = HashSet::from_iter([TableId::new(1), TableId::new(3)]);
156
157 retain_table_options(&mut table_options, &compact_table_id_set);
158
159 assert_eq!(
160 table_options.keys().copied().collect::<Vec<_>>(),
161 vec![TableId::new(1), TableId::new(3)]
162 );
163 }
164}