risingwave_meta/hummock/manager/compaction/
compact_task_builder.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::key_range::KeyRange;
22use risingwave_hummock_sdk::table_watermark::{TableWatermarks, WatermarkSerdeType};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
24use risingwave_pb::hummock::compact_task::TaskStatus;
25use risingwave_pb::hummock::{CompactionConfig, TableOption, TableSchema};
26
27use crate::hummock::compaction::CompactionTask as PickedCompactionTask;
28
29pub(super) struct CompactTaskBuildContext {
30 pub(super) task_id: HummockCompactionTaskId,
31 pub(super) compaction_group_id: CompactionGroupId,
32 pub(super) compaction_group_version_id: u64,
33 pub(super) existing_table_ids: Vec<TableId>,
34 pub(super) table_options: BTreeMap<TableId, TableOption>,
35 pub(super) is_target_level_last: bool,
36 pub(super) compaction_config: Arc<CompactionConfig>,
37 pub(super) current_epoch_time: u64,
38}
39
40pub(super) fn build_base_compact_task(
41 picked_task: PickedCompactionTask,
42 context: CompactTaskBuildContext,
43) -> (CompactTask, Vec<TableId>) {
44 let target_level_id = picked_task.input.target_level as u32;
45 let input_ssts = picked_task.input.input_levels;
46 let compact_table_ids = input_ssts
47 .iter()
48 .flat_map(|level| level.table_infos.iter())
49 .flat_map(|sst| sst.table_ids.iter().copied())
50 .sorted()
51 .dedup()
52 .collect_vec();
53 let compact_table_id_set: HashSet<_> = compact_table_ids.iter().copied().collect();
54 let mut table_options = context.table_options;
55 retain_table_options(&mut table_options, &compact_table_id_set);
56 let compression_algorithm = match picked_task.compression_algorithm.as_str() {
57 "Lz4" => 1,
58 "Zstd" => 2,
59 _ => 0,
60 };
61 let compaction_config = context.compaction_config.as_ref();
62
63 (
64 CompactTask {
65 input_ssts,
66 splits: vec![KeyRange::inf()],
67 sorted_output_ssts: vec![],
68 task_id: context.task_id,
69 target_level: target_level_id,
70 gc_delete_keys: context.is_target_level_last,
72 base_level: picked_task.base_level as u32,
73 task_status: TaskStatus::Pending,
74 compaction_group_id: context.compaction_group_id,
75 compaction_group_version_id: context.compaction_group_version_id,
76 existing_table_ids: context.existing_table_ids,
77 compression_algorithm,
78 target_file_size: picked_task.target_file_size,
79 table_options,
80 current_epoch_time: context.current_epoch_time,
81 compaction_filter_mask: compaction_config.compaction_filter_mask,
82 target_sub_level_id: picked_task.input.target_sub_level_id,
83 task_type: picked_task.compaction_task_type,
84 split_weight_by_vnode: picked_task.input.vnode_partition_count,
85 max_sub_compaction: compaction_config.max_sub_compaction,
86 max_kv_count_for_xor16: compaction_config.max_kv_count_for_xor16,
87 max_vnode_key_range_bytes: compaction_config.max_vnode_key_range_bytes,
88 ..Default::default()
89 },
90 compact_table_ids,
91 )
92}
93
94pub(super) fn attach_compact_task_table_metadata(
95 compact_task: &mut CompactTask,
96 compact_table_ids: &[TableId],
97 table_watermarks: BTreeMap<TableId, TableWatermarks>,
98 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
99) {
100 let mut pk_prefix_table_watermarks = BTreeMap::default();
101 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
102 let mut value_table_watermarks = BTreeMap::default();
103 for (table_id, watermark) in table_watermarks {
104 match watermark.watermark_type {
105 WatermarkSerdeType::PkPrefix => {
106 pk_prefix_table_watermarks.insert(table_id, watermark);
107 }
108 WatermarkSerdeType::NonPkPrefix => {
109 non_pk_prefix_table_watermarks.insert(table_id, watermark);
110 }
111 WatermarkSerdeType::Value => {
112 value_table_watermarks.insert(table_id, watermark);
113 }
114 }
115 }
116
117 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
118 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
119 compact_task.value_table_watermarks = value_table_watermarks;
120 compact_task.table_schemas = compact_table_ids
121 .iter()
122 .filter_map(|table_id| {
123 all_versioned_table_schemas.get(table_id).map(|column_ids| {
124 (
125 *table_id,
126 TableSchema {
127 column_ids: column_ids.clone(),
128 },
129 )
130 })
131 })
132 .collect();
133}
134
135fn retain_table_options(
136 table_options: &mut BTreeMap<TableId, TableOption>,
137 table_id_set: &HashSet<TableId>,
138) {
139 table_options.retain(|table_id, _| table_id_set.contains(table_id));
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn test_retain_table_options() {
148 let mut table_options = BTreeMap::from([
149 (TableId::new(1), TableOption::default()),
150 (TableId::new(2), TableOption::default()),
151 (TableId::new(3), TableOption::default()),
152 ]);
153 let compact_table_id_set = HashSet::from_iter([TableId::new(1), TableId::new(3)]);
154
155 retain_table_options(&mut table_options, &compact_table_id_set);
156
157 assert_eq!(
158 table_options.keys().copied().collect::<Vec<_>>(),
159 vec![TableId::new(1), TableId::new(3)]
160 );
161 }
162}