1use std::collections::{BTreeMap, HashMap};
16
17use itertools::Itertools;
18use risingwave_common::catalog::TableId;
19use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
20use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
21use risingwave_pb::hummock::{
22 CompactTaskAssignment as PbCompactTaskAssignment, LevelType, PbCompactTask, PbKeyRange,
23 PbSstableFilterLayout, PbSstableFilterType, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockContextId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37 pub input_ssts: Vec<InputLevel>,
42 pub splits: Vec<KeyRange>,
45 pub sorted_output_ssts: Vec<SstableInfo>,
47 pub task_id: u64,
49 pub target_level: u32,
51 pub gc_delete_keys: bool,
52 pub base_level: u32,
54 pub task_status: PbTaskStatus,
55 pub compaction_group_id: CompactionGroupId,
57 pub compaction_group_version_id: u64,
59 pub existing_table_ids: Vec<TableId>,
64 pub compression_algorithm: u32,
65 pub target_file_size: u64,
66 pub compaction_filter_mask: u32,
67 pub table_options: BTreeMap<TableId, PbTableOption>,
68 pub current_epoch_time: u64,
69 pub target_sub_level_id: u64,
70 pub task_type: PbTaskType,
72 pub split_by_state_table: bool,
74 pub split_weight_by_vnode: u32,
77 pub table_vnode_partition: BTreeMap<TableId, u32>,
78 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
81
82 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
83 pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
84
85 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
86
87 pub max_sub_compaction: u32,
88
89 pub blocked_xor_filter_kv_count_threshold: Option<u64>,
90
91 pub max_vnode_key_range_bytes: Option<u64>,
92
93 pub sstable_filter_type: PbSstableFilterType,
94
95 pub sstable_filter_layout: PbSstableFilterLayout,
96}
97
98#[derive(Clone, PartialEq, Default, Debug)]
99pub struct CompactTaskAssignment {
100 pub compact_task: CompactTask,
101 pub context_id: HummockContextId,
102}
103
104impl CompactTask {
105 pub fn is_trivial_move_task(&self) -> bool {
106 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
107 return false;
108 }
109
110 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
111 {
112 return false;
113 }
114
115 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
117 && self.input_ssts[0].level_idx > 0
118 {
119 return false;
120 }
121
122 if self.input_ssts[1].level_idx == self.target_level
123 && self.input_ssts[1].table_infos.is_empty()
124 {
125 return true;
126 }
127
128 false
129 }
130
131 pub fn task_label(&self) -> &'static str {
132 if self.is_trivial_reclaim() {
133 return "trivial-space-reclaim";
134 }
135
136 if self.is_trivial_move_task() {
137 return "trivial-move";
138 }
139
140 "normal"
141 }
142
143 pub fn is_trivial_reclaim(&self) -> bool {
144 if self.task_type == TaskType::VnodeWatermark {
146 return true;
147 }
148 self.input_ssts
149 .iter()
150 .flat_map(|level| level.table_infos.iter())
151 .all(|sst| sst.table_ids.is_empty())
152 }
153}
154
155impl CompactTask {
156 pub fn contains_ttl(&self) -> bool {
158 self.table_options
159 .iter()
160 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
161 }
162
163 pub fn contains_range_tombstone(&self) -> bool {
165 self.read_input_ssts()
166 .any(|sst| sst.range_tombstone_count > 0)
167 }
168
169 pub fn contains_split_sst(&self) -> bool {
171 self.read_input_ssts()
172 .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
173 }
174
175 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
177 self.input_ssts
178 .iter()
179 .flat_map(|level| level.table_infos.iter())
180 .flat_map(|sst| sst.table_ids.iter().copied())
181 .sorted()
182 .dedup()
183 }
184
185 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
186 is_compaction_task_expired(
187 self.compaction_group_version_id,
188 compaction_group_version_id_expected,
189 )
190 }
191
192 pub fn sstable_filter_layout_for_output(
198 &self,
199 estimated_output_key_count: u64,
200 ) -> PbSstableFilterLayout {
201 match self.sstable_filter_layout {
202 PbSstableFilterLayout::Plain => PbSstableFilterLayout::Plain,
203 PbSstableFilterLayout::Blocked => PbSstableFilterLayout::Blocked,
204 PbSstableFilterLayout::Auto | PbSstableFilterLayout::Unspecified => {
205 if crate::filter_utils::should_use_blocked_xor_filter_by_kv_count(
206 estimated_output_key_count,
207 self.blocked_xor_filter_kv_count_threshold,
208 ) {
209 PbSstableFilterLayout::Blocked
210 } else {
211 PbSstableFilterLayout::Plain
212 }
213 }
214 }
215 }
216
217 pub fn effective_max_vnode_key_range_bytes(&self) -> Option<usize> {
221 let limit = self.max_vnode_key_range_bytes.filter(|&v| v > 0)? as usize;
222 (self.get_table_ids_from_input_ssts().count() == 1).then_some(limit)
223 }
224
225 pub fn read_input_ssts(&self) -> impl Iterator<Item = &SstableInfo> {
230 self.input_ssts
231 .iter()
232 .flat_map(|level| level.read_sstable_infos())
233 }
234}
235
236fn split_watermark_serde_types(
237 pb_compact_task: &PbCompactTask,
238) -> (
239 BTreeMap<TableId, TableWatermarks>,
240 BTreeMap<TableId, TableWatermarks>,
241 BTreeMap<TableId, TableWatermarks>,
242) {
243 let mut pk_prefix_table_watermarks = BTreeMap::default();
244 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
245 let mut value_table_watermarks = BTreeMap::default();
246 for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
247 let watermark = TableWatermarks::from(pbwatermark);
248 match watermark.watermark_type {
249 WatermarkSerdeType::PkPrefix => {
250 pk_prefix_table_watermarks.insert(*table_id, watermark);
251 }
252 WatermarkSerdeType::NonPkPrefix => {
253 non_pk_prefix_table_watermarks.insert(*table_id, watermark);
254 }
255 WatermarkSerdeType::Value => {
256 value_table_watermarks.insert(*table_id, watermark);
257 }
258 }
259 }
260 (
261 pk_prefix_table_watermarks,
262 non_pk_prefix_table_watermarks,
263 value_table_watermarks,
264 )
265}
266
267pub fn is_compaction_task_expired(
268 compaction_group_version_id_in_task: u64,
269 compaction_group_version_id_expected: u64,
270) -> bool {
271 compaction_group_version_id_in_task != compaction_group_version_id_expected
272}
273
274impl From<PbCompactTask> for CompactTask {
275 fn from(pb_compact_task: PbCompactTask) -> Self {
276 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
277 split_watermark_serde_types(&pb_compact_task);
278 #[expect(deprecated)]
279 Self {
280 input_ssts: pb_compact_task
281 .input_ssts
282 .into_iter()
283 .map(InputLevel::from)
284 .collect_vec(),
285 splits: pb_compact_task
286 .splits
287 .into_iter()
288 .map(|pb_keyrange| KeyRange {
289 left: pb_keyrange.left.into(),
290 right: pb_keyrange.right.into(),
291 right_exclusive: pb_keyrange.right_exclusive,
292 })
293 .collect_vec(),
294 sorted_output_ssts: pb_compact_task
295 .sorted_output_ssts
296 .into_iter()
297 .map(SstableInfo::from)
298 .collect_vec(),
299 task_id: pb_compact_task.task_id,
300 target_level: pb_compact_task.target_level,
301 gc_delete_keys: pb_compact_task.gc_delete_keys,
302 base_level: pb_compact_task.base_level,
303 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
304 compaction_group_id: pb_compact_task.compaction_group_id,
305 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
306 compression_algorithm: pb_compact_task.compression_algorithm,
307 target_file_size: pb_compact_task.target_file_size,
308 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
309 table_options: pb_compact_task
310 .table_options
311 .iter()
312 .map(|(table_id, v)| (*table_id, *v))
313 .collect(),
314 current_epoch_time: pb_compact_task.current_epoch_time,
315 target_sub_level_id: pb_compact_task.target_sub_level_id,
316 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
317 split_by_state_table: pb_compact_task.split_by_state_table,
318 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
319 table_vnode_partition: pb_compact_task
320 .table_vnode_partition
321 .iter()
322 .map(|(table_id, v)| (*table_id, *v))
323 .collect(),
324 pk_prefix_table_watermarks,
325 non_pk_prefix_table_watermarks,
326 value_table_watermarks,
327 table_schemas: pb_compact_task
328 .table_schemas
329 .iter()
330 .map(|(table_id, v)| (*table_id, v.clone()))
331 .collect(),
332 max_sub_compaction: pb_compact_task.max_sub_compaction,
333 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
334 blocked_xor_filter_kv_count_threshold: pb_compact_task.max_kv_count_for_xor16,
335 max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
336 sstable_filter_type: PbSstableFilterType::try_from(pb_compact_task.sstable_filter_type)
337 .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
338 sstable_filter_layout: PbSstableFilterLayout::try_from(
339 pb_compact_task.sstable_filter_layout,
340 )
341 .unwrap_or(PbSstableFilterLayout::Auto),
342 }
343 }
344}
345
346impl From<&PbCompactTask> for CompactTask {
347 fn from(pb_compact_task: &PbCompactTask) -> Self {
348 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
349 split_watermark_serde_types(pb_compact_task);
350 #[expect(deprecated)]
351 Self {
352 input_ssts: pb_compact_task
353 .input_ssts
354 .iter()
355 .map(InputLevel::from)
356 .collect_vec(),
357 splits: pb_compact_task
358 .splits
359 .iter()
360 .map(|pb_keyrange| KeyRange {
361 left: pb_keyrange.left.clone().into(),
362 right: pb_keyrange.right.clone().into(),
363 right_exclusive: pb_keyrange.right_exclusive,
364 })
365 .collect_vec(),
366 sorted_output_ssts: pb_compact_task
367 .sorted_output_ssts
368 .iter()
369 .map(SstableInfo::from)
370 .collect_vec(),
371 task_id: pb_compact_task.task_id,
372 target_level: pb_compact_task.target_level,
373 gc_delete_keys: pb_compact_task.gc_delete_keys,
374 base_level: pb_compact_task.base_level,
375 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
376 compaction_group_id: pb_compact_task.compaction_group_id,
377 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
378 compression_algorithm: pb_compact_task.compression_algorithm,
379 target_file_size: pb_compact_task.target_file_size,
380 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
381 table_options: pb_compact_task
382 .table_options
383 .iter()
384 .map(|(table_id, v)| (*table_id, *v))
385 .collect(),
386 current_epoch_time: pb_compact_task.current_epoch_time,
387 target_sub_level_id: pb_compact_task.target_sub_level_id,
388 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
389 split_by_state_table: pb_compact_task.split_by_state_table,
390 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
391 table_vnode_partition: pb_compact_task
392 .table_vnode_partition
393 .iter()
394 .map(|(table_id, v)| (*table_id, *v))
395 .collect(),
396 pk_prefix_table_watermarks,
397 non_pk_prefix_table_watermarks,
398 value_table_watermarks,
399 table_schemas: pb_compact_task
400 .table_schemas
401 .iter()
402 .map(|(table_id, v)| (*table_id, v.clone()))
403 .collect(),
404 max_sub_compaction: pb_compact_task.max_sub_compaction,
405 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
406 blocked_xor_filter_kv_count_threshold: pb_compact_task.max_kv_count_for_xor16,
407 max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
408 sstable_filter_type: PbSstableFilterType::try_from(pb_compact_task.sstable_filter_type)
409 .unwrap_or(PbSstableFilterType::SstableFilterUnspecified),
410 sstable_filter_layout: PbSstableFilterLayout::try_from(
411 pb_compact_task.sstable_filter_layout,
412 )
413 .unwrap_or(PbSstableFilterLayout::Auto),
414 }
415 }
416}
417
418impl From<CompactTask> for PbCompactTask {
419 fn from(compact_task: CompactTask) -> Self {
420 #[expect(deprecated)]
421 Self {
422 input_ssts: compact_task
423 .input_ssts
424 .into_iter()
425 .map(|input_level| input_level.into())
426 .collect_vec(),
427 splits: compact_task
428 .splits
429 .into_iter()
430 .map(|keyrange| PbKeyRange {
431 left: keyrange.left.into(),
432 right: keyrange.right.into(),
433 right_exclusive: keyrange.right_exclusive,
434 })
435 .collect_vec(),
436 sorted_output_ssts: compact_task
437 .sorted_output_ssts
438 .into_iter()
439 .map(|sst| sst.into())
440 .collect_vec(),
441 task_id: compact_task.task_id,
442 target_level: compact_task.target_level,
443 gc_delete_keys: compact_task.gc_delete_keys,
444 base_level: compact_task.base_level,
445 task_status: compact_task.task_status.into(),
446 compaction_group_id: compact_task.compaction_group_id,
447 existing_table_ids: compact_task.existing_table_ids.clone(),
448 compression_algorithm: compact_task.compression_algorithm,
449 target_file_size: compact_task.target_file_size,
450 compaction_filter_mask: compact_task.compaction_filter_mask,
451 table_options: compact_task.table_options.clone(),
452 current_epoch_time: compact_task.current_epoch_time,
453 target_sub_level_id: compact_task.target_sub_level_id,
454 task_type: compact_task.task_type.into(),
455 split_weight_by_vnode: compact_task.split_weight_by_vnode,
456 table_vnode_partition: compact_task.table_vnode_partition.clone(),
457 table_watermarks: compact_task
458 .pk_prefix_table_watermarks
459 .into_iter()
460 .chain(compact_task.non_pk_prefix_table_watermarks)
461 .chain(compact_task.value_table_watermarks)
462 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
463 .collect(),
464 split_by_state_table: compact_task.split_by_state_table,
465 table_schemas: compact_task.table_schemas.clone(),
466 max_sub_compaction: compact_task.max_sub_compaction,
467 compaction_group_version_id: compact_task.compaction_group_version_id,
468 max_kv_count_for_xor16: compact_task.blocked_xor_filter_kv_count_threshold,
469 max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
470 sstable_filter_type: compact_task.sstable_filter_type.into(),
471 sstable_filter_layout: compact_task.sstable_filter_layout.into(),
472 }
473 }
474}
475
476impl From<&CompactTask> for PbCompactTask {
477 fn from(compact_task: &CompactTask) -> Self {
478 #[expect(deprecated)]
479 Self {
480 input_ssts: compact_task
481 .input_ssts
482 .iter()
483 .map(|input_level| input_level.into())
484 .collect_vec(),
485 splits: compact_task
486 .splits
487 .iter()
488 .map(|keyrange| PbKeyRange {
489 left: keyrange.left.to_vec(),
490 right: keyrange.right.to_vec(),
491 right_exclusive: keyrange.right_exclusive,
492 })
493 .collect_vec(),
494 sorted_output_ssts: compact_task
495 .sorted_output_ssts
496 .iter()
497 .map(|sst| sst.into())
498 .collect_vec(),
499 task_id: compact_task.task_id,
500 target_level: compact_task.target_level,
501 gc_delete_keys: compact_task.gc_delete_keys,
502 base_level: compact_task.base_level,
503 task_status: compact_task.task_status.into(),
504 compaction_group_id: compact_task.compaction_group_id,
505 existing_table_ids: compact_task.existing_table_ids.clone(),
506 compression_algorithm: compact_task.compression_algorithm,
507 target_file_size: compact_task.target_file_size,
508 compaction_filter_mask: compact_task.compaction_filter_mask,
509 table_options: compact_task.table_options.clone(),
510 current_epoch_time: compact_task.current_epoch_time,
511 target_sub_level_id: compact_task.target_sub_level_id,
512 task_type: compact_task.task_type.into(),
513 split_weight_by_vnode: compact_task.split_weight_by_vnode,
514 table_vnode_partition: compact_task.table_vnode_partition.clone(),
515 table_watermarks: compact_task
516 .pk_prefix_table_watermarks
517 .iter()
518 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
519 .chain(compact_task.value_table_watermarks.iter())
520 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
521 .collect(),
522 split_by_state_table: compact_task.split_by_state_table,
523 table_schemas: compact_task.table_schemas.clone(),
524 max_sub_compaction: compact_task.max_sub_compaction,
525 compaction_group_version_id: compact_task.compaction_group_version_id,
526 max_kv_count_for_xor16: compact_task.blocked_xor_filter_kv_count_threshold,
527 max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
528 sstable_filter_type: compact_task.sstable_filter_type.into(),
529 sstable_filter_layout: compact_task.sstable_filter_layout.into(),
530 }
531 }
532}
533
534impl From<PbCompactTaskAssignment> for CompactTaskAssignment {
535 fn from(assignment: PbCompactTaskAssignment) -> Self {
536 Self {
537 compact_task: CompactTask::from(assignment.compact_task.unwrap()),
538 context_id: assignment.context_id,
539 }
540 }
541}
542
543impl From<&PbCompactTaskAssignment> for CompactTaskAssignment {
544 fn from(assignment: &PbCompactTaskAssignment) -> Self {
545 Self {
546 compact_task: CompactTask::from(assignment.compact_task.as_ref().unwrap()),
547 context_id: assignment.context_id,
548 }
549 }
550}
551
552impl From<CompactTaskAssignment> for PbCompactTaskAssignment {
553 fn from(assignment: CompactTaskAssignment) -> Self {
554 Self {
555 compact_task: Some(assignment.compact_task.into()),
556 context_id: assignment.context_id,
557 }
558 }
559}
560
561impl From<&CompactTaskAssignment> for PbCompactTaskAssignment {
562 fn from(assignment: &CompactTaskAssignment) -> Self {
563 Self {
564 compact_task: Some((&assignment.compact_task).into()),
565 context_id: assignment.context_id,
566 }
567 }
568}
569
570#[derive(Clone, PartialEq, Default)]
571pub struct ValidationTask {
572 pub sst_infos: Vec<SstableInfo>,
573 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
574}
575
576impl From<PbValidationTask> for ValidationTask {
577 fn from(pb_validation_task: PbValidationTask) -> Self {
578 Self {
579 sst_infos: pb_validation_task
580 .sst_infos
581 .into_iter()
582 .map(SstableInfo::from)
583 .collect_vec(),
584 sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
585 }
586 }
587}
588
589impl From<ValidationTask> for PbValidationTask {
590 fn from(validation_task: ValidationTask) -> Self {
591 Self {
592 sst_infos: validation_task
593 .sst_infos
594 .into_iter()
595 .map(|sst| sst.into())
596 .collect_vec(),
597 sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
598 }
599 }
600}
601
602#[derive(Clone, PartialEq, Default, Debug)]
603pub struct ReportTask {
604 pub table_stats_change: HashMap<TableId, PbTableStats>,
605 pub task_id: u64,
606 pub task_status: TaskStatus,
607 pub sorted_output_ssts: Vec<SstableInfo>,
608 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
609}
610
611impl From<PbReportTask> for ReportTask {
612 fn from(value: PbReportTask) -> Self {
613 Self {
614 table_stats_change: value.table_stats_change.clone(),
615 task_id: value.task_id,
616 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
617 sorted_output_ssts: value
618 .sorted_output_ssts
619 .into_iter()
620 .map(SstableInfo::from)
621 .collect_vec(),
622 object_timestamps: value.object_timestamps,
623 }
624 }
625}
626
627impl From<ReportTask> for PbReportTask {
628 fn from(value: ReportTask) -> Self {
629 Self {
630 table_stats_change: value.table_stats_change.clone(),
631 task_id: value.task_id,
632 task_status: value.task_status.into(),
633 sorted_output_ssts: value
634 .sorted_output_ssts
635 .into_iter()
636 .map(|sst| sst.into())
637 .collect_vec(),
638 object_timestamps: value.object_timestamps,
639 }
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use risingwave_common::catalog::TableId;
646 use risingwave_pb::hummock::compact_task::TaskType;
647 use risingwave_pb::hummock::{PbCompactTask, PbLevelType, PbSstableFilterLayout};
648
649 use super::CompactTask;
650 use crate::level::InputLevel;
651 use crate::sstable_info::{SstableInfo, SstableInfoInner};
652
653 fn test_sstable(sst_id: u64, table_ids: Vec<TableId>) -> SstableInfo {
654 SstableInfo::from(SstableInfoInner {
655 object_id: sst_id.into(),
656 sst_id: sst_id.into(),
657 table_ids,
658 ..Default::default()
659 })
660 }
661
662 fn test_read_property_sstable(table_ids: Vec<TableId>) -> SstableInfo {
663 SstableInfo::from(SstableInfoInner {
664 object_id: 1.into(),
665 sst_id: 2.into(),
666 table_ids,
667 range_tombstone_count: 1,
668 total_key_count: 100,
669 ..Default::default()
670 })
671 }
672
673 #[test]
674 fn test_blocked_xor_filter_kv_count_threshold_roundtrip() {
675 let pb = PbCompactTask {
676 max_kv_count_for_xor16: Some(123),
677 ..Default::default()
678 };
679
680 let task = CompactTask::from(&pb);
681 assert_eq!(task.blocked_xor_filter_kv_count_threshold, Some(123));
682
683 let pb2 = PbCompactTask::from(&task);
684 assert_eq!(pb2.max_kv_count_for_xor16, Some(123));
685 }
686
687 #[test]
688 fn test_sstable_filter_layout_for_output() {
689 let task = CompactTask {
690 sstable_filter_layout: PbSstableFilterLayout::Auto,
691 blocked_xor_filter_kv_count_threshold: Some(100),
692 ..Default::default()
693 };
694
695 assert_eq!(
696 task.sstable_filter_layout_for_output(100),
697 PbSstableFilterLayout::Plain
698 );
699 assert_eq!(
700 task.sstable_filter_layout_for_output(101),
701 PbSstableFilterLayout::Blocked
702 );
703
704 let task = CompactTask {
705 sstable_filter_layout: PbSstableFilterLayout::Plain,
706 blocked_xor_filter_kv_count_threshold: Some(100),
707 ..Default::default()
708 };
709
710 assert_eq!(
711 task.sstable_filter_layout_for_output(101),
712 PbSstableFilterLayout::Plain
713 );
714
715 let task = CompactTask {
716 sstable_filter_layout: PbSstableFilterLayout::Blocked,
717 blocked_xor_filter_kv_count_threshold: Some(100),
718 ..Default::default()
719 };
720
721 assert_eq!(
722 task.sstable_filter_layout_for_output(0),
723 PbSstableFilterLayout::Blocked
724 );
725 }
726
727 #[test]
728 fn test_empty_table_ids_are_reclaim_and_have_no_input_table_ids() {
729 let task = CompactTask {
730 input_ssts: vec![InputLevel {
731 table_infos: vec![test_sstable(1, vec![])],
732 ..Default::default()
733 }],
734 existing_table_ids: vec![TableId::new(1)],
735 ..Default::default()
736 };
737
738 assert!(task.get_table_ids_from_input_ssts().next().is_none());
739 assert!(task.is_trivial_reclaim());
740 }
741
742 #[test]
743 fn test_read_properties_ignore_empty_table_ids() {
744 let task = CompactTask {
745 input_ssts: vec![InputLevel {
746 table_infos: vec![
747 test_read_property_sstable(vec![]),
748 SstableInfo::from(SstableInfoInner {
749 object_id: 3.into(),
750 sst_id: 3.into(),
751 table_ids: vec![TableId::new(1)],
752 total_key_count: 1,
753 ..Default::default()
754 }),
755 ],
756 ..Default::default()
757 }],
758 ..Default::default()
759 };
760
761 assert!(!task.contains_range_tombstone());
762 assert!(!task.contains_split_sst());
763
764 let task = CompactTask {
765 input_ssts: vec![InputLevel {
766 table_infos: vec![test_read_property_sstable(vec![TableId::new(1)])],
767 ..Default::default()
768 }],
769 ..Default::default()
770 };
771
772 assert!(task.contains_range_tombstone());
773 assert!(task.contains_split_sst());
774 }
775
776 #[test]
777 fn test_task_label_for_trivial_move_and_reclaim() {
778 let trivial_move_task = CompactTask {
779 input_ssts: vec![
780 InputLevel {
781 level_idx: 1,
782 level_type: PbLevelType::Nonoverlapping,
783 table_infos: vec![
784 test_sstable(10, vec![TableId::new(1)]),
785 test_sstable(11, vec![]),
786 ],
787 },
788 InputLevel {
789 level_idx: 2,
790 level_type: PbLevelType::Nonoverlapping,
791 table_infos: vec![],
792 },
793 ],
794 target_level: 2,
795 task_type: TaskType::Dynamic,
796 ..Default::default()
797 };
798
799 assert!(!trivial_move_task.is_trivial_reclaim());
800 assert_eq!(trivial_move_task.task_label(), "trivial-move");
801
802 let trivial_reclaim_task = CompactTask {
803 input_ssts: vec![
804 InputLevel {
805 level_idx: 1,
806 level_type: PbLevelType::Nonoverlapping,
807 table_infos: vec![test_sstable(10, vec![])],
808 },
809 InputLevel {
810 level_idx: 2,
811 level_type: PbLevelType::Nonoverlapping,
812 table_infos: vec![],
813 },
814 ],
815 target_level: 2,
816 task_type: TaskType::Dynamic,
817 ..Default::default()
818 };
819
820 assert!(trivial_reclaim_task.is_trivial_reclaim());
821 assert!(trivial_reclaim_task.is_trivial_move_task());
822 assert_eq!(trivial_reclaim_task.task_label(), "trivial-space-reclaim");
823 }
824}