1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
20use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
21use risingwave_pb::hummock::{
22 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
23 PbValidationTask,
24};
25
26use crate::HummockSstableObjectId;
27use crate::compaction_group::StateTableId;
28use crate::key_range::KeyRange;
29use crate::level::InputLevel;
30use crate::sstable_info::SstableInfo;
31use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
32
33#[derive(Clone, PartialEq, Default, Debug)]
34pub struct CompactTask {
35 pub input_ssts: Vec<InputLevel>,
37 pub splits: Vec<KeyRange>,
40 pub sorted_output_ssts: Vec<SstableInfo>,
42 pub task_id: u64,
44 pub target_level: u32,
46 pub gc_delete_keys: bool,
47 pub base_level: u32,
49 pub task_status: PbTaskStatus,
50 pub compaction_group_id: u64,
52 pub compaction_group_version_id: u64,
54 pub existing_table_ids: Vec<u32>,
56 pub compression_algorithm: u32,
57 pub target_file_size: u64,
58 pub compaction_filter_mask: u32,
59 pub table_options: BTreeMap<u32, PbTableOption>,
60 pub current_epoch_time: u64,
61 pub target_sub_level_id: u64,
62 pub task_type: PbTaskType,
64 pub split_by_state_table: bool,
66 pub split_weight_by_vnode: u32,
69 pub table_vnode_partition: BTreeMap<u32, u32>,
70 pub pk_prefix_table_watermarks: BTreeMap<u32, TableWatermarks>,
73
74 pub non_pk_prefix_table_watermarks: BTreeMap<u32, TableWatermarks>,
75
76 pub table_schemas: BTreeMap<u32, PbTableSchema>,
77
78 pub max_sub_compaction: u32,
79}
80
81impl CompactTask {
82 pub fn estimated_encode_len(&self) -> usize {
83 self.input_ssts
84 .iter()
85 .map(|input_level| input_level.estimated_encode_len())
86 .sum::<usize>()
87 + self
88 .splits
89 .iter()
90 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
91 .sum::<usize>()
92 + size_of::<u64>()
93 + self
94 .sorted_output_ssts
95 .iter()
96 .map(|sst| sst.estimated_encode_len())
97 .sum::<usize>()
98 + size_of::<u64>()
99 + size_of::<u32>()
100 + size_of::<bool>()
101 + size_of::<u32>()
102 + size_of::<i32>()
103 + size_of::<u64>()
104 + self.existing_table_ids.len() * size_of::<u32>()
105 + size_of::<u32>()
106 + size_of::<u64>()
107 + size_of::<u32>()
108 + self.table_options.len() * size_of::<u64>()
109 + size_of::<u64>()
110 + size_of::<u64>()
111 + size_of::<i32>()
112 + size_of::<bool>()
113 + size_of::<u32>()
114 + self.table_vnode_partition.len() * size_of::<u64>()
115 + self
116 .pk_prefix_table_watermarks
117 .values()
118 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
119 .sum::<usize>()
120 + self
121 .non_pk_prefix_table_watermarks
122 .values()
123 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
124 .sum::<usize>()
125 }
126
127 pub fn is_trivial_move_task(&self) -> bool {
128 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
129 return false;
130 }
131
132 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
133 {
134 return false;
135 }
136
137 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
139 && self.input_ssts[0].level_idx > 0
140 {
141 return false;
142 }
143
144 if self.input_ssts[1].level_idx == self.target_level
145 && self.input_ssts[1].table_infos.is_empty()
146 {
147 return true;
148 }
149
150 false
151 }
152
153 pub fn is_trivial_reclaim(&self) -> bool {
154 if self.task_type == TaskType::VnodeWatermark {
156 return true;
157 }
158 let exist_table_ids = HashSet::<u32>::from_iter(self.existing_table_ids.clone());
159 self.input_ssts.iter().all(|level| {
160 level.table_infos.iter().all(|sst| {
161 sst.table_ids
162 .iter()
163 .all(|table_id| !exist_table_ids.contains(table_id))
164 })
165 })
166 }
167}
168
169impl CompactTask {
170 pub fn contains_ttl(&self) -> bool {
172 self.table_options
173 .iter()
174 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
175 }
176
177 pub fn contains_range_tombstone(&self) -> bool {
179 self.input_ssts
180 .iter()
181 .flat_map(|level| level.table_infos.iter())
182 .any(|sst| sst.range_tombstone_count > 0)
183 }
184
185 pub fn contains_split_sst(&self) -> bool {
187 self.input_ssts
188 .iter()
189 .flat_map(|level| level.table_infos.iter())
190 .any(|sst| sst.sst_id != sst.object_id)
191 }
192
193 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
194 self.input_ssts
195 .iter()
196 .flat_map(|level| level.table_infos.iter())
197 .flat_map(|sst| sst.table_ids.clone())
198 .sorted()
199 .dedup()
200 }
201
202 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
204 let existing_table_ids: HashSet<u32> = HashSet::from_iter(self.existing_table_ids.clone());
205 self.get_table_ids_from_input_ssts()
206 .filter(|table_id| existing_table_ids.contains(table_id))
207 .collect()
208 }
209
210 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
211 is_compaction_task_expired(
212 self.compaction_group_version_id,
213 compaction_group_version_id_expected,
214 )
215 }
216}
217
218pub fn is_compaction_task_expired(
219 compaction_group_version_id_in_task: u64,
220 compaction_group_version_id_expected: u64,
221) -> bool {
222 compaction_group_version_id_in_task != compaction_group_version_id_expected
223}
224
225impl From<PbCompactTask> for CompactTask {
226 fn from(pb_compact_task: PbCompactTask) -> Self {
227 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
228 .table_watermarks
229 .into_iter()
230 .map(|(table_id, pb_table_watermark)| {
231 let table_watermark = TableWatermarks::from(pb_table_watermark);
232 (table_id, table_watermark)
233 })
234 .partition(|(_table_id, table_watermarke)| {
235 matches!(
236 table_watermarke.watermark_type,
237 WatermarkSerdeType::PkPrefix
238 )
239 });
240
241 #[expect(deprecated)]
242 Self {
243 input_ssts: pb_compact_task
244 .input_ssts
245 .into_iter()
246 .map(InputLevel::from)
247 .collect_vec(),
248 splits: pb_compact_task
249 .splits
250 .into_iter()
251 .map(|pb_keyrange| KeyRange {
252 left: pb_keyrange.left.into(),
253 right: pb_keyrange.right.into(),
254 right_exclusive: pb_keyrange.right_exclusive,
255 })
256 .collect_vec(),
257 sorted_output_ssts: pb_compact_task
258 .sorted_output_ssts
259 .into_iter()
260 .map(SstableInfo::from)
261 .collect_vec(),
262 task_id: pb_compact_task.task_id,
263 target_level: pb_compact_task.target_level,
264 gc_delete_keys: pb_compact_task.gc_delete_keys,
265 base_level: pb_compact_task.base_level,
266 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
267 compaction_group_id: pb_compact_task.compaction_group_id,
268 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
269 compression_algorithm: pb_compact_task.compression_algorithm,
270 target_file_size: pb_compact_task.target_file_size,
271 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
272 table_options: pb_compact_task.table_options.clone(),
273 current_epoch_time: pb_compact_task.current_epoch_time,
274 target_sub_level_id: pb_compact_task.target_sub_level_id,
275 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
276 split_by_state_table: pb_compact_task.split_by_state_table,
277 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
278 table_vnode_partition: pb_compact_task.table_vnode_partition.clone(),
279 pk_prefix_table_watermarks,
280 non_pk_prefix_table_watermarks,
281 table_schemas: pb_compact_task.table_schemas,
282 max_sub_compaction: pb_compact_task.max_sub_compaction,
283 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
284 }
285 }
286}
287
288impl From<&PbCompactTask> for CompactTask {
289 fn from(pb_compact_task: &PbCompactTask) -> Self {
290 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
291 .table_watermarks
292 .iter()
293 .map(|(table_id, pb_table_watermark)| {
294 let table_watermark = TableWatermarks::from(pb_table_watermark);
295 (*table_id, table_watermark)
296 })
297 .partition(|(_table_id, table_watermarke)| {
298 matches!(
299 table_watermarke.watermark_type,
300 WatermarkSerdeType::PkPrefix
301 )
302 });
303
304 #[expect(deprecated)]
305 Self {
306 input_ssts: pb_compact_task
307 .input_ssts
308 .iter()
309 .map(InputLevel::from)
310 .collect_vec(),
311 splits: pb_compact_task
312 .splits
313 .iter()
314 .map(|pb_keyrange| KeyRange {
315 left: pb_keyrange.left.clone().into(),
316 right: pb_keyrange.right.clone().into(),
317 right_exclusive: pb_keyrange.right_exclusive,
318 })
319 .collect_vec(),
320 sorted_output_ssts: pb_compact_task
321 .sorted_output_ssts
322 .iter()
323 .map(SstableInfo::from)
324 .collect_vec(),
325 task_id: pb_compact_task.task_id,
326 target_level: pb_compact_task.target_level,
327 gc_delete_keys: pb_compact_task.gc_delete_keys,
328 base_level: pb_compact_task.base_level,
329 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
330 compaction_group_id: pb_compact_task.compaction_group_id,
331 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
332 compression_algorithm: pb_compact_task.compression_algorithm,
333 target_file_size: pb_compact_task.target_file_size,
334 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
335 table_options: pb_compact_task.table_options.clone(),
336 current_epoch_time: pb_compact_task.current_epoch_time,
337 target_sub_level_id: pb_compact_task.target_sub_level_id,
338 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
339 split_by_state_table: pb_compact_task.split_by_state_table,
340 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
341 table_vnode_partition: pb_compact_task.table_vnode_partition.clone(),
342 pk_prefix_table_watermarks,
343 non_pk_prefix_table_watermarks,
344 table_schemas: pb_compact_task.table_schemas.clone(),
345 max_sub_compaction: pb_compact_task.max_sub_compaction,
346 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
347 }
348 }
349}
350
351impl From<CompactTask> for PbCompactTask {
352 fn from(compact_task: CompactTask) -> Self {
353 #[expect(deprecated)]
354 Self {
355 input_ssts: compact_task
356 .input_ssts
357 .into_iter()
358 .map(|input_level| input_level.into())
359 .collect_vec(),
360 splits: compact_task
361 .splits
362 .into_iter()
363 .map(|keyrange| PbKeyRange {
364 left: keyrange.left.into(),
365 right: keyrange.right.into(),
366 right_exclusive: keyrange.right_exclusive,
367 })
368 .collect_vec(),
369 sorted_output_ssts: compact_task
370 .sorted_output_ssts
371 .into_iter()
372 .map(|sst| sst.into())
373 .collect_vec(),
374 task_id: compact_task.task_id,
375 target_level: compact_task.target_level,
376 gc_delete_keys: compact_task.gc_delete_keys,
377 base_level: compact_task.base_level,
378 task_status: compact_task.task_status.into(),
379 compaction_group_id: compact_task.compaction_group_id,
380 existing_table_ids: compact_task.existing_table_ids.clone(),
381 compression_algorithm: compact_task.compression_algorithm,
382 target_file_size: compact_task.target_file_size,
383 compaction_filter_mask: compact_task.compaction_filter_mask,
384 table_options: compact_task.table_options.clone(),
385 current_epoch_time: compact_task.current_epoch_time,
386 target_sub_level_id: compact_task.target_sub_level_id,
387 task_type: compact_task.task_type.into(),
388 split_weight_by_vnode: compact_task.split_weight_by_vnode,
389 table_vnode_partition: compact_task.table_vnode_partition.clone(),
390 table_watermarks: compact_task
391 .pk_prefix_table_watermarks
392 .into_iter()
393 .chain(compact_task.non_pk_prefix_table_watermarks)
394 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
395 .collect(),
396 split_by_state_table: compact_task.split_by_state_table,
397 table_schemas: compact_task.table_schemas.clone(),
398 max_sub_compaction: compact_task.max_sub_compaction,
399 compaction_group_version_id: compact_task.compaction_group_version_id,
400 }
401 }
402}
403
404impl From<&CompactTask> for PbCompactTask {
405 fn from(compact_task: &CompactTask) -> Self {
406 #[expect(deprecated)]
407 Self {
408 input_ssts: compact_task
409 .input_ssts
410 .iter()
411 .map(|input_level| input_level.into())
412 .collect_vec(),
413 splits: compact_task
414 .splits
415 .iter()
416 .map(|keyrange| PbKeyRange {
417 left: keyrange.left.to_vec(),
418 right: keyrange.right.to_vec(),
419 right_exclusive: keyrange.right_exclusive,
420 })
421 .collect_vec(),
422 sorted_output_ssts: compact_task
423 .sorted_output_ssts
424 .iter()
425 .map(|sst| sst.into())
426 .collect_vec(),
427 task_id: compact_task.task_id,
428 target_level: compact_task.target_level,
429 gc_delete_keys: compact_task.gc_delete_keys,
430 base_level: compact_task.base_level,
431 task_status: compact_task.task_status.into(),
432 compaction_group_id: compact_task.compaction_group_id,
433 existing_table_ids: compact_task.existing_table_ids.clone(),
434 compression_algorithm: compact_task.compression_algorithm,
435 target_file_size: compact_task.target_file_size,
436 compaction_filter_mask: compact_task.compaction_filter_mask,
437 table_options: compact_task.table_options.clone(),
438 current_epoch_time: compact_task.current_epoch_time,
439 target_sub_level_id: compact_task.target_sub_level_id,
440 task_type: compact_task.task_type.into(),
441 split_weight_by_vnode: compact_task.split_weight_by_vnode,
442 table_vnode_partition: compact_task.table_vnode_partition.clone(),
443 table_watermarks: compact_task
444 .pk_prefix_table_watermarks
445 .iter()
446 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
447 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
448 .collect(),
449 split_by_state_table: compact_task.split_by_state_table,
450 table_schemas: compact_task.table_schemas.clone(),
451 max_sub_compaction: compact_task.max_sub_compaction,
452 compaction_group_version_id: compact_task.compaction_group_version_id,
453 }
454 }
455}
456
457#[derive(Clone, PartialEq, Default)]
458pub struct ValidationTask {
459 pub sst_infos: Vec<SstableInfo>,
460 pub sst_id_to_worker_id: HashMap<u64, u32>,
461}
462
463impl From<PbValidationTask> for ValidationTask {
464 fn from(pb_validation_task: PbValidationTask) -> Self {
465 Self {
466 sst_infos: pb_validation_task
467 .sst_infos
468 .into_iter()
469 .map(SstableInfo::from)
470 .collect_vec(),
471 sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.clone(),
472 }
473 }
474}
475
476impl From<ValidationTask> for PbValidationTask {
477 fn from(validation_task: ValidationTask) -> Self {
478 Self {
479 sst_infos: validation_task
480 .sst_infos
481 .into_iter()
482 .map(|sst| sst.into())
483 .collect_vec(),
484 sst_id_to_worker_id: validation_task.sst_id_to_worker_id.clone(),
485 }
486 }
487}
488
489impl ValidationTask {
490 pub fn estimated_encode_len(&self) -> usize {
491 self.sst_infos
492 .iter()
493 .map(|sst| sst.estimated_encode_len())
494 .sum::<usize>()
495 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
496 + size_of::<u64>()
497 }
498}
499
500#[derive(Clone, PartialEq, Default, Debug)]
501pub struct ReportTask {
502 pub table_stats_change: HashMap<u32, PbTableStats>,
503 pub task_id: u64,
504 pub task_status: TaskStatus,
505 pub sorted_output_ssts: Vec<SstableInfo>,
506 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
507}
508
509impl From<PbReportTask> for ReportTask {
510 fn from(value: PbReportTask) -> Self {
511 Self {
512 table_stats_change: value.table_stats_change.clone(),
513 task_id: value.task_id,
514 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
515 sorted_output_ssts: value
516 .sorted_output_ssts
517 .into_iter()
518 .map(SstableInfo::from)
519 .collect_vec(),
520 object_timestamps: value.object_timestamps,
521 }
522 }
523}
524
525impl From<ReportTask> for PbReportTask {
526 fn from(value: ReportTask) -> Self {
527 Self {
528 table_stats_change: value.table_stats_change.clone(),
529 task_id: value.task_id,
530 task_status: value.task_status.into(),
531 sorted_output_ssts: value
532 .sorted_output_ssts
533 .into_iter()
534 .map(|sst| sst.into())
535 .collect_vec(),
536 object_timestamps: value.object_timestamps,
537 }
538 }
539}