1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36 pub input_ssts: Vec<InputLevel>,
38 pub splits: Vec<KeyRange>,
41 pub sorted_output_ssts: Vec<SstableInfo>,
43 pub task_id: u64,
45 pub target_level: u32,
47 pub gc_delete_keys: bool,
48 pub base_level: u32,
50 pub task_status: PbTaskStatus,
51 pub compaction_group_id: u64,
53 pub compaction_group_version_id: u64,
55 pub existing_table_ids: Vec<TableId>,
57 pub compression_algorithm: u32,
58 pub target_file_size: u64,
59 pub compaction_filter_mask: u32,
60 pub table_options: BTreeMap<TableId, PbTableOption>,
61 pub current_epoch_time: u64,
62 pub target_sub_level_id: u64,
63 pub task_type: PbTaskType,
65 pub split_by_state_table: bool,
67 pub split_weight_by_vnode: u32,
70 pub table_vnode_partition: BTreeMap<TableId, u32>,
71 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76
77 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
78
79 pub max_sub_compaction: u32,
80}
81
82impl CompactTask {
83 pub fn estimated_encode_len(&self) -> usize {
84 self.input_ssts
85 .iter()
86 .map(|input_level| input_level.estimated_encode_len())
87 .sum::<usize>()
88 + self
89 .splits
90 .iter()
91 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
92 .sum::<usize>()
93 + size_of::<u64>()
94 + self
95 .sorted_output_ssts
96 .iter()
97 .map(|sst| sst.estimated_encode_len())
98 .sum::<usize>()
99 + size_of::<u64>()
100 + size_of::<u32>()
101 + size_of::<bool>()
102 + size_of::<u32>()
103 + size_of::<i32>()
104 + size_of::<u64>()
105 + self.existing_table_ids.len() * size_of::<u32>()
106 + size_of::<u32>()
107 + size_of::<u64>()
108 + size_of::<u32>()
109 + self.table_options.len() * size_of::<u64>()
110 + size_of::<u64>()
111 + size_of::<u64>()
112 + size_of::<i32>()
113 + size_of::<bool>()
114 + size_of::<u32>()
115 + self.table_vnode_partition.len() * size_of::<u64>()
116 + self
117 .pk_prefix_table_watermarks
118 .values()
119 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
120 .sum::<usize>()
121 + self
122 .non_pk_prefix_table_watermarks
123 .values()
124 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
125 .sum::<usize>()
126 }
127
128 pub fn is_trivial_move_task(&self) -> bool {
129 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
130 return false;
131 }
132
133 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
134 {
135 return false;
136 }
137
138 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
140 && self.input_ssts[0].level_idx > 0
141 {
142 return false;
143 }
144
145 if self.input_ssts[1].level_idx == self.target_level
146 && self.input_ssts[1].table_infos.is_empty()
147 {
148 return true;
149 }
150
151 false
152 }
153
154 pub fn is_trivial_reclaim(&self) -> bool {
155 if self.task_type == TaskType::VnodeWatermark {
157 return true;
158 }
159 let exist_table_ids =
160 HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
161 self.input_ssts.iter().all(|level| {
162 level.table_infos.iter().all(|sst| {
163 sst.table_ids
164 .iter()
165 .all(|table_id| !exist_table_ids.contains(table_id))
166 })
167 })
168 }
169}
170
171impl CompactTask {
172 pub fn contains_ttl(&self) -> bool {
174 self.table_options
175 .iter()
176 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
177 }
178
179 pub fn contains_range_tombstone(&self) -> bool {
181 self.input_ssts
182 .iter()
183 .flat_map(|level| level.table_infos.iter())
184 .any(|sst| sst.range_tombstone_count > 0)
185 }
186
187 pub fn contains_split_sst(&self) -> bool {
189 self.input_ssts
190 .iter()
191 .flat_map(|level| level.table_infos.iter())
192 .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
193 }
194
195 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
196 self.input_ssts
197 .iter()
198 .flat_map(|level| level.table_infos.iter())
199 .flat_map(|sst| sst.table_ids.clone())
200 .sorted()
201 .dedup()
202 }
203
204 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
206 let existing_table_ids: HashSet<TableId> =
207 HashSet::from_iter(self.existing_table_ids.clone());
208 self.get_table_ids_from_input_ssts()
209 .filter(|table_id| existing_table_ids.contains(table_id))
210 .collect()
211 }
212
213 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
214 is_compaction_task_expired(
215 self.compaction_group_version_id,
216 compaction_group_version_id_expected,
217 )
218 }
219}
220
221pub fn is_compaction_task_expired(
222 compaction_group_version_id_in_task: u64,
223 compaction_group_version_id_expected: u64,
224) -> bool {
225 compaction_group_version_id_in_task != compaction_group_version_id_expected
226}
227
228impl From<PbCompactTask> for CompactTask {
229 fn from(pb_compact_task: PbCompactTask) -> Self {
230 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
231 .table_watermarks
232 .into_iter()
233 .map(|(table_id, pb_table_watermark)| {
234 let table_watermark = TableWatermarks::from(pb_table_watermark);
235 (table_id, table_watermark)
236 })
237 .partition(|(_table_id, table_watermarke)| {
238 matches!(
239 table_watermarke.watermark_type,
240 WatermarkSerdeType::PkPrefix
241 )
242 });
243
244 #[expect(deprecated)]
245 Self {
246 input_ssts: pb_compact_task
247 .input_ssts
248 .into_iter()
249 .map(InputLevel::from)
250 .collect_vec(),
251 splits: pb_compact_task
252 .splits
253 .into_iter()
254 .map(|pb_keyrange| KeyRange {
255 left: pb_keyrange.left.into(),
256 right: pb_keyrange.right.into(),
257 right_exclusive: pb_keyrange.right_exclusive,
258 })
259 .collect_vec(),
260 sorted_output_ssts: pb_compact_task
261 .sorted_output_ssts
262 .into_iter()
263 .map(SstableInfo::from)
264 .collect_vec(),
265 task_id: pb_compact_task.task_id,
266 target_level: pb_compact_task.target_level,
267 gc_delete_keys: pb_compact_task.gc_delete_keys,
268 base_level: pb_compact_task.base_level,
269 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
270 compaction_group_id: pb_compact_task.compaction_group_id,
271 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
272 compression_algorithm: pb_compact_task.compression_algorithm,
273 target_file_size: pb_compact_task.target_file_size,
274 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
275 table_options: pb_compact_task
276 .table_options
277 .iter()
278 .map(|(table_id, v)| (*table_id, *v))
279 .collect(),
280 current_epoch_time: pb_compact_task.current_epoch_time,
281 target_sub_level_id: pb_compact_task.target_sub_level_id,
282 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
283 split_by_state_table: pb_compact_task.split_by_state_table,
284 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
285 table_vnode_partition: pb_compact_task
286 .table_vnode_partition
287 .iter()
288 .map(|(table_id, v)| (*table_id, *v))
289 .collect(),
290 pk_prefix_table_watermarks,
291 non_pk_prefix_table_watermarks,
292 table_schemas: pb_compact_task
293 .table_schemas
294 .iter()
295 .map(|(table_id, v)| (*table_id, v.clone()))
296 .collect(),
297 max_sub_compaction: pb_compact_task.max_sub_compaction,
298 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
299 }
300 }
301}
302
303impl From<&PbCompactTask> for CompactTask {
304 fn from(pb_compact_task: &PbCompactTask) -> Self {
305 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
306 .table_watermarks
307 .iter()
308 .map(|(table_id, pb_table_watermark)| {
309 let table_watermark = TableWatermarks::from(pb_table_watermark);
310 (*table_id, table_watermark)
311 })
312 .partition(|(_table_id, table_watermarke)| {
313 matches!(
314 table_watermarke.watermark_type,
315 WatermarkSerdeType::PkPrefix
316 )
317 });
318
319 #[expect(deprecated)]
320 Self {
321 input_ssts: pb_compact_task
322 .input_ssts
323 .iter()
324 .map(InputLevel::from)
325 .collect_vec(),
326 splits: pb_compact_task
327 .splits
328 .iter()
329 .map(|pb_keyrange| KeyRange {
330 left: pb_keyrange.left.clone().into(),
331 right: pb_keyrange.right.clone().into(),
332 right_exclusive: pb_keyrange.right_exclusive,
333 })
334 .collect_vec(),
335 sorted_output_ssts: pb_compact_task
336 .sorted_output_ssts
337 .iter()
338 .map(SstableInfo::from)
339 .collect_vec(),
340 task_id: pb_compact_task.task_id,
341 target_level: pb_compact_task.target_level,
342 gc_delete_keys: pb_compact_task.gc_delete_keys,
343 base_level: pb_compact_task.base_level,
344 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
345 compaction_group_id: pb_compact_task.compaction_group_id,
346 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
347 compression_algorithm: pb_compact_task.compression_algorithm,
348 target_file_size: pb_compact_task.target_file_size,
349 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
350 table_options: pb_compact_task
351 .table_options
352 .iter()
353 .map(|(table_id, v)| (*table_id, *v))
354 .collect(),
355 current_epoch_time: pb_compact_task.current_epoch_time,
356 target_sub_level_id: pb_compact_task.target_sub_level_id,
357 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
358 split_by_state_table: pb_compact_task.split_by_state_table,
359 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
360 table_vnode_partition: pb_compact_task
361 .table_vnode_partition
362 .iter()
363 .map(|(table_id, v)| (*table_id, *v))
364 .collect(),
365 pk_prefix_table_watermarks,
366 non_pk_prefix_table_watermarks,
367 table_schemas: pb_compact_task
368 .table_schemas
369 .iter()
370 .map(|(table_id, v)| (*table_id, v.clone()))
371 .collect(),
372 max_sub_compaction: pb_compact_task.max_sub_compaction,
373 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
374 }
375 }
376}
377
378impl From<CompactTask> for PbCompactTask {
379 fn from(compact_task: CompactTask) -> Self {
380 #[expect(deprecated)]
381 Self {
382 input_ssts: compact_task
383 .input_ssts
384 .into_iter()
385 .map(|input_level| input_level.into())
386 .collect_vec(),
387 splits: compact_task
388 .splits
389 .into_iter()
390 .map(|keyrange| PbKeyRange {
391 left: keyrange.left.into(),
392 right: keyrange.right.into(),
393 right_exclusive: keyrange.right_exclusive,
394 })
395 .collect_vec(),
396 sorted_output_ssts: compact_task
397 .sorted_output_ssts
398 .into_iter()
399 .map(|sst| sst.into())
400 .collect_vec(),
401 task_id: compact_task.task_id,
402 target_level: compact_task.target_level,
403 gc_delete_keys: compact_task.gc_delete_keys,
404 base_level: compact_task.base_level,
405 task_status: compact_task.task_status.into(),
406 compaction_group_id: compact_task.compaction_group_id,
407 existing_table_ids: compact_task.existing_table_ids.clone(),
408 compression_algorithm: compact_task.compression_algorithm,
409 target_file_size: compact_task.target_file_size,
410 compaction_filter_mask: compact_task.compaction_filter_mask,
411 table_options: compact_task.table_options.clone(),
412 current_epoch_time: compact_task.current_epoch_time,
413 target_sub_level_id: compact_task.target_sub_level_id,
414 task_type: compact_task.task_type.into(),
415 split_weight_by_vnode: compact_task.split_weight_by_vnode,
416 table_vnode_partition: compact_task.table_vnode_partition.clone(),
417 table_watermarks: compact_task
418 .pk_prefix_table_watermarks
419 .into_iter()
420 .chain(compact_task.non_pk_prefix_table_watermarks)
421 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
422 .collect(),
423 split_by_state_table: compact_task.split_by_state_table,
424 table_schemas: compact_task.table_schemas.clone(),
425 max_sub_compaction: compact_task.max_sub_compaction,
426 compaction_group_version_id: compact_task.compaction_group_version_id,
427 }
428 }
429}
430
431impl From<&CompactTask> for PbCompactTask {
432 fn from(compact_task: &CompactTask) -> Self {
433 #[expect(deprecated)]
434 Self {
435 input_ssts: compact_task
436 .input_ssts
437 .iter()
438 .map(|input_level| input_level.into())
439 .collect_vec(),
440 splits: compact_task
441 .splits
442 .iter()
443 .map(|keyrange| PbKeyRange {
444 left: keyrange.left.to_vec(),
445 right: keyrange.right.to_vec(),
446 right_exclusive: keyrange.right_exclusive,
447 })
448 .collect_vec(),
449 sorted_output_ssts: compact_task
450 .sorted_output_ssts
451 .iter()
452 .map(|sst| sst.into())
453 .collect_vec(),
454 task_id: compact_task.task_id,
455 target_level: compact_task.target_level,
456 gc_delete_keys: compact_task.gc_delete_keys,
457 base_level: compact_task.base_level,
458 task_status: compact_task.task_status.into(),
459 compaction_group_id: compact_task.compaction_group_id,
460 existing_table_ids: compact_task.existing_table_ids.clone(),
461 compression_algorithm: compact_task.compression_algorithm,
462 target_file_size: compact_task.target_file_size,
463 compaction_filter_mask: compact_task.compaction_filter_mask,
464 table_options: compact_task.table_options.clone(),
465 current_epoch_time: compact_task.current_epoch_time,
466 target_sub_level_id: compact_task.target_sub_level_id,
467 task_type: compact_task.task_type.into(),
468 split_weight_by_vnode: compact_task.split_weight_by_vnode,
469 table_vnode_partition: compact_task.table_vnode_partition.clone(),
470 table_watermarks: compact_task
471 .pk_prefix_table_watermarks
472 .iter()
473 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
474 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
475 .collect(),
476 split_by_state_table: compact_task.split_by_state_table,
477 table_schemas: compact_task.table_schemas.clone(),
478 max_sub_compaction: compact_task.max_sub_compaction,
479 compaction_group_version_id: compact_task.compaction_group_version_id,
480 }
481 }
482}
483
484#[derive(Clone, PartialEq, Default)]
485pub struct ValidationTask {
486 pub sst_infos: Vec<SstableInfo>,
487 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
488}
489
490impl From<PbValidationTask> for ValidationTask {
491 fn from(pb_validation_task: PbValidationTask) -> Self {
492 Self {
493 sst_infos: pb_validation_task
494 .sst_infos
495 .into_iter()
496 .map(SstableInfo::from)
497 .collect_vec(),
498 sst_id_to_worker_id: pb_validation_task
499 .sst_id_to_worker_id
500 .into_iter()
501 .map(|(object_id, worker_id)| (object_id.into(), worker_id))
502 .collect(),
503 }
504 }
505}
506
507impl From<ValidationTask> for PbValidationTask {
508 fn from(validation_task: ValidationTask) -> Self {
509 Self {
510 sst_infos: validation_task
511 .sst_infos
512 .into_iter()
513 .map(|sst| sst.into())
514 .collect_vec(),
515 sst_id_to_worker_id: validation_task
516 .sst_id_to_worker_id
517 .into_iter()
518 .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
519 .collect(),
520 }
521 }
522}
523
524impl ValidationTask {
525 pub fn estimated_encode_len(&self) -> usize {
526 self.sst_infos
527 .iter()
528 .map(|sst| sst.estimated_encode_len())
529 .sum::<usize>()
530 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
531 + size_of::<u64>()
532 }
533}
534
535#[derive(Clone, PartialEq, Default, Debug)]
536pub struct ReportTask {
537 pub table_stats_change: HashMap<TableId, PbTableStats>,
538 pub task_id: u64,
539 pub task_status: TaskStatus,
540 pub sorted_output_ssts: Vec<SstableInfo>,
541 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
542}
543
544impl From<PbReportTask> for ReportTask {
545 fn from(value: PbReportTask) -> Self {
546 Self {
547 table_stats_change: value.table_stats_change.clone(),
548 task_id: value.task_id,
549 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
550 sorted_output_ssts: value
551 .sorted_output_ssts
552 .into_iter()
553 .map(SstableInfo::from)
554 .collect_vec(),
555 object_timestamps: value
556 .object_timestamps
557 .into_iter()
558 .map(|(object_id, timestamp)| (object_id.into(), timestamp))
559 .collect(),
560 }
561 }
562}
563
564impl From<ReportTask> for PbReportTask {
565 fn from(value: ReportTask) -> Self {
566 Self {
567 table_stats_change: value.table_stats_change.clone(),
568 task_id: value.task_id,
569 task_status: value.task_status.into(),
570 sorted_output_ssts: value
571 .sorted_output_ssts
572 .into_iter()
573 .map(|sst| sst.into())
574 .collect_vec(),
575 object_timestamps: value
576 .object_timestamps
577 .into_iter()
578 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
579 .collect(),
580 }
581 }
582}