1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36 pub input_ssts: Vec<InputLevel>,
38 pub splits: Vec<KeyRange>,
41 pub sorted_output_ssts: Vec<SstableInfo>,
43 pub task_id: u64,
45 pub target_level: u32,
47 pub gc_delete_keys: bool,
48 pub base_level: u32,
50 pub task_status: PbTaskStatus,
51 pub compaction_group_id: u64,
53 pub compaction_group_version_id: u64,
55 pub existing_table_ids: Vec<TableId>,
57 pub compression_algorithm: u32,
58 pub target_file_size: u64,
59 pub compaction_filter_mask: u32,
60 pub table_options: BTreeMap<TableId, PbTableOption>,
61 pub current_epoch_time: u64,
62 pub target_sub_level_id: u64,
63 pub task_type: PbTaskType,
65 pub split_by_state_table: bool,
67 pub split_weight_by_vnode: u32,
70 pub table_vnode_partition: BTreeMap<TableId, u32>,
71 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76 pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77
78 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
79
80 pub max_sub_compaction: u32,
81
82 pub max_kv_count_for_xor16: Option<u64>,
83}
84
85impl CompactTask {
86 pub fn estimated_encode_len(&self) -> usize {
87 self.input_ssts
88 .iter()
89 .map(|input_level| input_level.estimated_encode_len())
90 .sum::<usize>()
91 + self
92 .splits
93 .iter()
94 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
95 .sum::<usize>()
96 + size_of::<u64>()
97 + self
98 .sorted_output_ssts
99 .iter()
100 .map(|sst| sst.estimated_encode_len())
101 .sum::<usize>()
102 + size_of::<u64>()
103 + size_of::<u32>()
104 + size_of::<bool>()
105 + size_of::<u32>()
106 + size_of::<i32>()
107 + size_of::<u64>()
108 + self.existing_table_ids.len() * size_of::<u32>()
109 + size_of::<u32>()
110 + size_of::<u64>()
111 + size_of::<u32>()
112 + self.table_options.len() * size_of::<u64>()
113 + size_of::<u64>()
114 + size_of::<u64>()
115 + size_of::<i32>()
116 + size_of::<bool>()
117 + size_of::<u32>()
118 + self.table_vnode_partition.len() * size_of::<u64>()
119 + self
120 .pk_prefix_table_watermarks
121 .values()
122 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
123 .sum::<usize>()
124 + self
125 .non_pk_prefix_table_watermarks
126 .values()
127 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
128 .sum::<usize>()
129 + self
130 .value_table_watermarks
131 .values()
132 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
133 .sum::<usize>()
134 }
135
136 pub fn is_trivial_move_task(&self) -> bool {
137 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
138 return false;
139 }
140
141 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
142 {
143 return false;
144 }
145
146 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
148 && self.input_ssts[0].level_idx > 0
149 {
150 return false;
151 }
152
153 if self.input_ssts[1].level_idx == self.target_level
154 && self.input_ssts[1].table_infos.is_empty()
155 {
156 return true;
157 }
158
159 false
160 }
161
162 pub fn is_trivial_reclaim(&self) -> bool {
163 if self.task_type == TaskType::VnodeWatermark {
165 return true;
166 }
167 let exist_table_ids =
168 HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
169 self.input_ssts.iter().all(|level| {
170 level.table_infos.iter().all(|sst| {
171 sst.table_ids
172 .iter()
173 .all(|table_id| !exist_table_ids.contains(table_id))
174 })
175 })
176 }
177}
178
179impl CompactTask {
180 pub fn contains_ttl(&self) -> bool {
182 self.table_options
183 .iter()
184 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
185 }
186
187 pub fn contains_range_tombstone(&self) -> bool {
189 self.input_ssts
190 .iter()
191 .flat_map(|level| level.table_infos.iter())
192 .any(|sst| sst.range_tombstone_count > 0)
193 }
194
195 pub fn contains_split_sst(&self) -> bool {
197 self.input_ssts
198 .iter()
199 .flat_map(|level| level.table_infos.iter())
200 .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
201 }
202
203 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
204 self.input_ssts
205 .iter()
206 .flat_map(|level| level.table_infos.iter())
207 .flat_map(|sst| sst.table_ids.clone())
208 .sorted()
209 .dedup()
210 }
211
212 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
214 let existing_table_ids: HashSet<TableId> =
215 HashSet::from_iter(self.existing_table_ids.clone());
216 self.get_table_ids_from_input_ssts()
217 .filter(|table_id| existing_table_ids.contains(table_id))
218 .collect()
219 }
220
221 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
222 is_compaction_task_expired(
223 self.compaction_group_version_id,
224 compaction_group_version_id_expected,
225 )
226 }
227
228 pub fn should_use_block_based_filter(&self) -> bool {
231 let kv_count = self
232 .input_ssts
233 .iter()
234 .flat_map(|level| level.table_infos.iter())
235 .map(|sst| sst.total_key_count)
236 .sum::<u64>();
237
238 crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
239 }
240}
241
242fn split_watermark_serde_types(
243 pb_compact_task: &PbCompactTask,
244) -> (
245 BTreeMap<TableId, TableWatermarks>,
246 BTreeMap<TableId, TableWatermarks>,
247 BTreeMap<TableId, TableWatermarks>,
248) {
249 let mut pk_prefix_table_watermarks = BTreeMap::default();
250 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
251 let mut value_table_watermarks = BTreeMap::default();
252 for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
253 let watermark = TableWatermarks::from(pbwatermark);
254 match watermark.watermark_type {
255 WatermarkSerdeType::PkPrefix => {
256 pk_prefix_table_watermarks.insert(*table_id, watermark);
257 }
258 WatermarkSerdeType::NonPkPrefix => {
259 non_pk_prefix_table_watermarks.insert(*table_id, watermark);
260 }
261 WatermarkSerdeType::Value => {
262 value_table_watermarks.insert(*table_id, watermark);
263 }
264 }
265 }
266 (
267 pk_prefix_table_watermarks,
268 non_pk_prefix_table_watermarks,
269 value_table_watermarks,
270 )
271}
272
273pub fn is_compaction_task_expired(
274 compaction_group_version_id_in_task: u64,
275 compaction_group_version_id_expected: u64,
276) -> bool {
277 compaction_group_version_id_in_task != compaction_group_version_id_expected
278}
279
280impl From<PbCompactTask> for CompactTask {
281 fn from(pb_compact_task: PbCompactTask) -> Self {
282 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
283 split_watermark_serde_types(&pb_compact_task);
284 #[expect(deprecated)]
285 Self {
286 input_ssts: pb_compact_task
287 .input_ssts
288 .into_iter()
289 .map(InputLevel::from)
290 .collect_vec(),
291 splits: pb_compact_task
292 .splits
293 .into_iter()
294 .map(|pb_keyrange| KeyRange {
295 left: pb_keyrange.left.into(),
296 right: pb_keyrange.right.into(),
297 right_exclusive: pb_keyrange.right_exclusive,
298 })
299 .collect_vec(),
300 sorted_output_ssts: pb_compact_task
301 .sorted_output_ssts
302 .into_iter()
303 .map(SstableInfo::from)
304 .collect_vec(),
305 task_id: pb_compact_task.task_id,
306 target_level: pb_compact_task.target_level,
307 gc_delete_keys: pb_compact_task.gc_delete_keys,
308 base_level: pb_compact_task.base_level,
309 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
310 compaction_group_id: pb_compact_task.compaction_group_id,
311 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
312 compression_algorithm: pb_compact_task.compression_algorithm,
313 target_file_size: pb_compact_task.target_file_size,
314 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
315 table_options: pb_compact_task
316 .table_options
317 .iter()
318 .map(|(table_id, v)| (*table_id, *v))
319 .collect(),
320 current_epoch_time: pb_compact_task.current_epoch_time,
321 target_sub_level_id: pb_compact_task.target_sub_level_id,
322 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
323 split_by_state_table: pb_compact_task.split_by_state_table,
324 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
325 table_vnode_partition: pb_compact_task
326 .table_vnode_partition
327 .iter()
328 .map(|(table_id, v)| (*table_id, *v))
329 .collect(),
330 pk_prefix_table_watermarks,
331 non_pk_prefix_table_watermarks,
332 value_table_watermarks,
333 table_schemas: pb_compact_task
334 .table_schemas
335 .iter()
336 .map(|(table_id, v)| (*table_id, v.clone()))
337 .collect(),
338 max_sub_compaction: pb_compact_task.max_sub_compaction,
339 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
340 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
341 }
342 }
343}
344
345impl From<&PbCompactTask> for CompactTask {
346 fn from(pb_compact_task: &PbCompactTask) -> Self {
347 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
348 split_watermark_serde_types(pb_compact_task);
349 #[expect(deprecated)]
350 Self {
351 input_ssts: pb_compact_task
352 .input_ssts
353 .iter()
354 .map(InputLevel::from)
355 .collect_vec(),
356 splits: pb_compact_task
357 .splits
358 .iter()
359 .map(|pb_keyrange| KeyRange {
360 left: pb_keyrange.left.clone().into(),
361 right: pb_keyrange.right.clone().into(),
362 right_exclusive: pb_keyrange.right_exclusive,
363 })
364 .collect_vec(),
365 sorted_output_ssts: pb_compact_task
366 .sorted_output_ssts
367 .iter()
368 .map(SstableInfo::from)
369 .collect_vec(),
370 task_id: pb_compact_task.task_id,
371 target_level: pb_compact_task.target_level,
372 gc_delete_keys: pb_compact_task.gc_delete_keys,
373 base_level: pb_compact_task.base_level,
374 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
375 compaction_group_id: pb_compact_task.compaction_group_id,
376 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
377 compression_algorithm: pb_compact_task.compression_algorithm,
378 target_file_size: pb_compact_task.target_file_size,
379 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
380 table_options: pb_compact_task
381 .table_options
382 .iter()
383 .map(|(table_id, v)| (*table_id, *v))
384 .collect(),
385 current_epoch_time: pb_compact_task.current_epoch_time,
386 target_sub_level_id: pb_compact_task.target_sub_level_id,
387 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
388 split_by_state_table: pb_compact_task.split_by_state_table,
389 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
390 table_vnode_partition: pb_compact_task
391 .table_vnode_partition
392 .iter()
393 .map(|(table_id, v)| (*table_id, *v))
394 .collect(),
395 pk_prefix_table_watermarks,
396 non_pk_prefix_table_watermarks,
397 value_table_watermarks,
398 table_schemas: pb_compact_task
399 .table_schemas
400 .iter()
401 .map(|(table_id, v)| (*table_id, v.clone()))
402 .collect(),
403 max_sub_compaction: pb_compact_task.max_sub_compaction,
404 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
405 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
406 }
407 }
408}
409
410impl From<CompactTask> for PbCompactTask {
411 fn from(compact_task: CompactTask) -> Self {
412 #[expect(deprecated)]
413 Self {
414 input_ssts: compact_task
415 .input_ssts
416 .into_iter()
417 .map(|input_level| input_level.into())
418 .collect_vec(),
419 splits: compact_task
420 .splits
421 .into_iter()
422 .map(|keyrange| PbKeyRange {
423 left: keyrange.left.into(),
424 right: keyrange.right.into(),
425 right_exclusive: keyrange.right_exclusive,
426 })
427 .collect_vec(),
428 sorted_output_ssts: compact_task
429 .sorted_output_ssts
430 .into_iter()
431 .map(|sst| sst.into())
432 .collect_vec(),
433 task_id: compact_task.task_id,
434 target_level: compact_task.target_level,
435 gc_delete_keys: compact_task.gc_delete_keys,
436 base_level: compact_task.base_level,
437 task_status: compact_task.task_status.into(),
438 compaction_group_id: compact_task.compaction_group_id,
439 existing_table_ids: compact_task.existing_table_ids.clone(),
440 compression_algorithm: compact_task.compression_algorithm,
441 target_file_size: compact_task.target_file_size,
442 compaction_filter_mask: compact_task.compaction_filter_mask,
443 table_options: compact_task.table_options.clone(),
444 current_epoch_time: compact_task.current_epoch_time,
445 target_sub_level_id: compact_task.target_sub_level_id,
446 task_type: compact_task.task_type.into(),
447 split_weight_by_vnode: compact_task.split_weight_by_vnode,
448 table_vnode_partition: compact_task.table_vnode_partition.clone(),
449 table_watermarks: compact_task
450 .pk_prefix_table_watermarks
451 .into_iter()
452 .chain(compact_task.non_pk_prefix_table_watermarks)
453 .chain(compact_task.value_table_watermarks)
454 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
455 .collect(),
456 split_by_state_table: compact_task.split_by_state_table,
457 table_schemas: compact_task.table_schemas.clone(),
458 max_sub_compaction: compact_task.max_sub_compaction,
459 compaction_group_version_id: compact_task.compaction_group_version_id,
460 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
461 }
462 }
463}
464
465impl From<&CompactTask> for PbCompactTask {
466 fn from(compact_task: &CompactTask) -> Self {
467 #[expect(deprecated)]
468 Self {
469 input_ssts: compact_task
470 .input_ssts
471 .iter()
472 .map(|input_level| input_level.into())
473 .collect_vec(),
474 splits: compact_task
475 .splits
476 .iter()
477 .map(|keyrange| PbKeyRange {
478 left: keyrange.left.to_vec(),
479 right: keyrange.right.to_vec(),
480 right_exclusive: keyrange.right_exclusive,
481 })
482 .collect_vec(),
483 sorted_output_ssts: compact_task
484 .sorted_output_ssts
485 .iter()
486 .map(|sst| sst.into())
487 .collect_vec(),
488 task_id: compact_task.task_id,
489 target_level: compact_task.target_level,
490 gc_delete_keys: compact_task.gc_delete_keys,
491 base_level: compact_task.base_level,
492 task_status: compact_task.task_status.into(),
493 compaction_group_id: compact_task.compaction_group_id,
494 existing_table_ids: compact_task.existing_table_ids.clone(),
495 compression_algorithm: compact_task.compression_algorithm,
496 target_file_size: compact_task.target_file_size,
497 compaction_filter_mask: compact_task.compaction_filter_mask,
498 table_options: compact_task.table_options.clone(),
499 current_epoch_time: compact_task.current_epoch_time,
500 target_sub_level_id: compact_task.target_sub_level_id,
501 task_type: compact_task.task_type.into(),
502 split_weight_by_vnode: compact_task.split_weight_by_vnode,
503 table_vnode_partition: compact_task.table_vnode_partition.clone(),
504 table_watermarks: compact_task
505 .pk_prefix_table_watermarks
506 .iter()
507 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
508 .chain(compact_task.value_table_watermarks.iter())
509 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
510 .collect(),
511 split_by_state_table: compact_task.split_by_state_table,
512 table_schemas: compact_task.table_schemas.clone(),
513 max_sub_compaction: compact_task.max_sub_compaction,
514 compaction_group_version_id: compact_task.compaction_group_version_id,
515 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
516 }
517 }
518}
519
520#[derive(Clone, PartialEq, Default)]
521pub struct ValidationTask {
522 pub sst_infos: Vec<SstableInfo>,
523 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
524}
525
526impl From<PbValidationTask> for ValidationTask {
527 fn from(pb_validation_task: PbValidationTask) -> Self {
528 Self {
529 sst_infos: pb_validation_task
530 .sst_infos
531 .into_iter()
532 .map(SstableInfo::from)
533 .collect_vec(),
534 sst_id_to_worker_id: pb_validation_task
535 .sst_id_to_worker_id
536 .into_iter()
537 .map(|(object_id, worker_id)| (object_id.into(), worker_id))
538 .collect(),
539 }
540 }
541}
542
543impl From<ValidationTask> for PbValidationTask {
544 fn from(validation_task: ValidationTask) -> Self {
545 Self {
546 sst_infos: validation_task
547 .sst_infos
548 .into_iter()
549 .map(|sst| sst.into())
550 .collect_vec(),
551 sst_id_to_worker_id: validation_task
552 .sst_id_to_worker_id
553 .into_iter()
554 .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
555 .collect(),
556 }
557 }
558}
559
560impl ValidationTask {
561 pub fn estimated_encode_len(&self) -> usize {
562 self.sst_infos
563 .iter()
564 .map(|sst| sst.estimated_encode_len())
565 .sum::<usize>()
566 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
567 + size_of::<u64>()
568 }
569}
570
571#[derive(Clone, PartialEq, Default, Debug)]
572pub struct ReportTask {
573 pub table_stats_change: HashMap<TableId, PbTableStats>,
574 pub task_id: u64,
575 pub task_status: TaskStatus,
576 pub sorted_output_ssts: Vec<SstableInfo>,
577 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
578}
579
580impl From<PbReportTask> for ReportTask {
581 fn from(value: PbReportTask) -> Self {
582 Self {
583 table_stats_change: value.table_stats_change.clone(),
584 task_id: value.task_id,
585 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
586 sorted_output_ssts: value
587 .sorted_output_ssts
588 .into_iter()
589 .map(SstableInfo::from)
590 .collect_vec(),
591 object_timestamps: value
592 .object_timestamps
593 .into_iter()
594 .map(|(object_id, timestamp)| (object_id.into(), timestamp))
595 .collect(),
596 }
597 }
598}
599
600impl From<ReportTask> for PbReportTask {
601 fn from(value: ReportTask) -> Self {
602 Self {
603 table_stats_change: value.table_stats_change.clone(),
604 task_id: value.task_id,
605 task_status: value.task_status.into(),
606 sorted_output_ssts: value
607 .sorted_output_ssts
608 .into_iter()
609 .map(|sst| sst.into())
610 .collect_vec(),
611 object_timestamps: value
612 .object_timestamps
613 .into_iter()
614 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
615 .collect(),
616 }
617 }
618}