1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37 pub input_ssts: Vec<InputLevel>,
39 pub splits: Vec<KeyRange>,
42 pub sorted_output_ssts: Vec<SstableInfo>,
44 pub task_id: u64,
46 pub target_level: u32,
48 pub gc_delete_keys: bool,
49 pub base_level: u32,
51 pub task_status: PbTaskStatus,
52 pub compaction_group_id: CompactionGroupId,
54 pub compaction_group_version_id: u64,
56 pub existing_table_ids: Vec<TableId>,
58 pub compression_algorithm: u32,
59 pub target_file_size: u64,
60 pub compaction_filter_mask: u32,
61 pub table_options: BTreeMap<TableId, PbTableOption>,
62 pub current_epoch_time: u64,
63 pub target_sub_level_id: u64,
64 pub task_type: PbTaskType,
66 pub split_by_state_table: bool,
68 pub split_weight_by_vnode: u32,
71 pub table_vnode_partition: BTreeMap<TableId, u32>,
72 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
75
76 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77 pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
78
79 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
80
81 pub max_sub_compaction: u32,
82
83 pub max_kv_count_for_xor16: Option<u64>,
84}
85
86impl CompactTask {
87 pub fn estimated_encode_len(&self) -> usize {
88 self.input_ssts
89 .iter()
90 .map(|input_level| input_level.estimated_encode_len())
91 .sum::<usize>()
92 + self
93 .splits
94 .iter()
95 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
96 .sum::<usize>()
97 + size_of::<u64>()
98 + self
99 .sorted_output_ssts
100 .iter()
101 .map(|sst| sst.estimated_encode_len())
102 .sum::<usize>()
103 + size_of::<u64>()
104 + size_of::<u32>()
105 + size_of::<bool>()
106 + size_of::<u32>()
107 + size_of::<i32>()
108 + size_of::<u64>()
109 + self.existing_table_ids.len() * size_of::<u32>()
110 + size_of::<u32>()
111 + size_of::<u64>()
112 + size_of::<u32>()
113 + self.table_options.len() * size_of::<u64>()
114 + size_of::<u64>()
115 + size_of::<u64>()
116 + size_of::<i32>()
117 + size_of::<bool>()
118 + size_of::<u32>()
119 + self.table_vnode_partition.len() * size_of::<u64>()
120 + self
121 .pk_prefix_table_watermarks
122 .values()
123 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
124 .sum::<usize>()
125 + self
126 .non_pk_prefix_table_watermarks
127 .values()
128 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
129 .sum::<usize>()
130 + self
131 .value_table_watermarks
132 .values()
133 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
134 .sum::<usize>()
135 }
136
137 pub fn is_trivial_move_task(&self) -> bool {
138 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
139 return false;
140 }
141
142 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
143 {
144 return false;
145 }
146
147 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
149 && self.input_ssts[0].level_idx > 0
150 {
151 return false;
152 }
153
154 if self.input_ssts[1].level_idx == self.target_level
155 && self.input_ssts[1].table_infos.is_empty()
156 {
157 return true;
158 }
159
160 false
161 }
162
163 pub fn is_trivial_reclaim(&self) -> bool {
164 if self.task_type == TaskType::VnodeWatermark {
166 return true;
167 }
168 let exist_table_ids =
169 HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
170 self.input_ssts.iter().all(|level| {
171 level.table_infos.iter().all(|sst| {
172 sst.table_ids
173 .iter()
174 .all(|table_id| !exist_table_ids.contains(table_id))
175 })
176 })
177 }
178}
179
180impl CompactTask {
181 pub fn contains_ttl(&self) -> bool {
183 self.table_options
184 .iter()
185 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
186 }
187
188 pub fn contains_range_tombstone(&self) -> bool {
190 self.input_ssts
191 .iter()
192 .flat_map(|level| level.table_infos.iter())
193 .any(|sst| sst.range_tombstone_count > 0)
194 }
195
196 pub fn contains_split_sst(&self) -> bool {
198 self.input_ssts
199 .iter()
200 .flat_map(|level| level.table_infos.iter())
201 .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
202 }
203
204 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
205 self.input_ssts
206 .iter()
207 .flat_map(|level| level.table_infos.iter())
208 .flat_map(|sst| sst.table_ids.clone())
209 .sorted()
210 .dedup()
211 }
212
213 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
215 let existing_table_ids: HashSet<TableId> =
216 HashSet::from_iter(self.existing_table_ids.clone());
217 self.get_table_ids_from_input_ssts()
218 .filter(|table_id| existing_table_ids.contains(table_id))
219 .collect()
220 }
221
222 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
223 is_compaction_task_expired(
224 self.compaction_group_version_id,
225 compaction_group_version_id_expected,
226 )
227 }
228
229 pub fn should_use_block_based_filter(&self) -> bool {
232 let kv_count = self
233 .input_ssts
234 .iter()
235 .flat_map(|level| level.table_infos.iter())
236 .map(|sst| sst.total_key_count)
237 .sum::<u64>();
238
239 crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
240 }
241}
242
243fn split_watermark_serde_types(
244 pb_compact_task: &PbCompactTask,
245) -> (
246 BTreeMap<TableId, TableWatermarks>,
247 BTreeMap<TableId, TableWatermarks>,
248 BTreeMap<TableId, TableWatermarks>,
249) {
250 let mut pk_prefix_table_watermarks = BTreeMap::default();
251 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
252 let mut value_table_watermarks = BTreeMap::default();
253 for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
254 let watermark = TableWatermarks::from(pbwatermark);
255 match watermark.watermark_type {
256 WatermarkSerdeType::PkPrefix => {
257 pk_prefix_table_watermarks.insert(*table_id, watermark);
258 }
259 WatermarkSerdeType::NonPkPrefix => {
260 non_pk_prefix_table_watermarks.insert(*table_id, watermark);
261 }
262 WatermarkSerdeType::Value => {
263 value_table_watermarks.insert(*table_id, watermark);
264 }
265 }
266 }
267 (
268 pk_prefix_table_watermarks,
269 non_pk_prefix_table_watermarks,
270 value_table_watermarks,
271 )
272}
273
274pub fn is_compaction_task_expired(
275 compaction_group_version_id_in_task: u64,
276 compaction_group_version_id_expected: u64,
277) -> bool {
278 compaction_group_version_id_in_task != compaction_group_version_id_expected
279}
280
281impl From<PbCompactTask> for CompactTask {
282 fn from(pb_compact_task: PbCompactTask) -> Self {
283 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
284 split_watermark_serde_types(&pb_compact_task);
285 #[expect(deprecated)]
286 Self {
287 input_ssts: pb_compact_task
288 .input_ssts
289 .into_iter()
290 .map(InputLevel::from)
291 .collect_vec(),
292 splits: pb_compact_task
293 .splits
294 .into_iter()
295 .map(|pb_keyrange| KeyRange {
296 left: pb_keyrange.left.into(),
297 right: pb_keyrange.right.into(),
298 right_exclusive: pb_keyrange.right_exclusive,
299 })
300 .collect_vec(),
301 sorted_output_ssts: pb_compact_task
302 .sorted_output_ssts
303 .into_iter()
304 .map(SstableInfo::from)
305 .collect_vec(),
306 task_id: pb_compact_task.task_id,
307 target_level: pb_compact_task.target_level,
308 gc_delete_keys: pb_compact_task.gc_delete_keys,
309 base_level: pb_compact_task.base_level,
310 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
311 compaction_group_id: pb_compact_task.compaction_group_id,
312 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
313 compression_algorithm: pb_compact_task.compression_algorithm,
314 target_file_size: pb_compact_task.target_file_size,
315 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
316 table_options: pb_compact_task
317 .table_options
318 .iter()
319 .map(|(table_id, v)| (*table_id, *v))
320 .collect(),
321 current_epoch_time: pb_compact_task.current_epoch_time,
322 target_sub_level_id: pb_compact_task.target_sub_level_id,
323 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
324 split_by_state_table: pb_compact_task.split_by_state_table,
325 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
326 table_vnode_partition: pb_compact_task
327 .table_vnode_partition
328 .iter()
329 .map(|(table_id, v)| (*table_id, *v))
330 .collect(),
331 pk_prefix_table_watermarks,
332 non_pk_prefix_table_watermarks,
333 value_table_watermarks,
334 table_schemas: pb_compact_task
335 .table_schemas
336 .iter()
337 .map(|(table_id, v)| (*table_id, v.clone()))
338 .collect(),
339 max_sub_compaction: pb_compact_task.max_sub_compaction,
340 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
341 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
342 }
343 }
344}
345
346impl From<&PbCompactTask> for CompactTask {
347 fn from(pb_compact_task: &PbCompactTask) -> Self {
348 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
349 split_watermark_serde_types(pb_compact_task);
350 #[expect(deprecated)]
351 Self {
352 input_ssts: pb_compact_task
353 .input_ssts
354 .iter()
355 .map(InputLevel::from)
356 .collect_vec(),
357 splits: pb_compact_task
358 .splits
359 .iter()
360 .map(|pb_keyrange| KeyRange {
361 left: pb_keyrange.left.clone().into(),
362 right: pb_keyrange.right.clone().into(),
363 right_exclusive: pb_keyrange.right_exclusive,
364 })
365 .collect_vec(),
366 sorted_output_ssts: pb_compact_task
367 .sorted_output_ssts
368 .iter()
369 .map(SstableInfo::from)
370 .collect_vec(),
371 task_id: pb_compact_task.task_id,
372 target_level: pb_compact_task.target_level,
373 gc_delete_keys: pb_compact_task.gc_delete_keys,
374 base_level: pb_compact_task.base_level,
375 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
376 compaction_group_id: pb_compact_task.compaction_group_id,
377 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
378 compression_algorithm: pb_compact_task.compression_algorithm,
379 target_file_size: pb_compact_task.target_file_size,
380 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
381 table_options: pb_compact_task
382 .table_options
383 .iter()
384 .map(|(table_id, v)| (*table_id, *v))
385 .collect(),
386 current_epoch_time: pb_compact_task.current_epoch_time,
387 target_sub_level_id: pb_compact_task.target_sub_level_id,
388 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
389 split_by_state_table: pb_compact_task.split_by_state_table,
390 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
391 table_vnode_partition: pb_compact_task
392 .table_vnode_partition
393 .iter()
394 .map(|(table_id, v)| (*table_id, *v))
395 .collect(),
396 pk_prefix_table_watermarks,
397 non_pk_prefix_table_watermarks,
398 value_table_watermarks,
399 table_schemas: pb_compact_task
400 .table_schemas
401 .iter()
402 .map(|(table_id, v)| (*table_id, v.clone()))
403 .collect(),
404 max_sub_compaction: pb_compact_task.max_sub_compaction,
405 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
406 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
407 }
408 }
409}
410
411impl From<CompactTask> for PbCompactTask {
412 fn from(compact_task: CompactTask) -> Self {
413 #[expect(deprecated)]
414 Self {
415 input_ssts: compact_task
416 .input_ssts
417 .into_iter()
418 .map(|input_level| input_level.into())
419 .collect_vec(),
420 splits: compact_task
421 .splits
422 .into_iter()
423 .map(|keyrange| PbKeyRange {
424 left: keyrange.left.into(),
425 right: keyrange.right.into(),
426 right_exclusive: keyrange.right_exclusive,
427 })
428 .collect_vec(),
429 sorted_output_ssts: compact_task
430 .sorted_output_ssts
431 .into_iter()
432 .map(|sst| sst.into())
433 .collect_vec(),
434 task_id: compact_task.task_id,
435 target_level: compact_task.target_level,
436 gc_delete_keys: compact_task.gc_delete_keys,
437 base_level: compact_task.base_level,
438 task_status: compact_task.task_status.into(),
439 compaction_group_id: compact_task.compaction_group_id,
440 existing_table_ids: compact_task.existing_table_ids.clone(),
441 compression_algorithm: compact_task.compression_algorithm,
442 target_file_size: compact_task.target_file_size,
443 compaction_filter_mask: compact_task.compaction_filter_mask,
444 table_options: compact_task.table_options.clone(),
445 current_epoch_time: compact_task.current_epoch_time,
446 target_sub_level_id: compact_task.target_sub_level_id,
447 task_type: compact_task.task_type.into(),
448 split_weight_by_vnode: compact_task.split_weight_by_vnode,
449 table_vnode_partition: compact_task.table_vnode_partition.clone(),
450 table_watermarks: compact_task
451 .pk_prefix_table_watermarks
452 .into_iter()
453 .chain(compact_task.non_pk_prefix_table_watermarks)
454 .chain(compact_task.value_table_watermarks)
455 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
456 .collect(),
457 split_by_state_table: compact_task.split_by_state_table,
458 table_schemas: compact_task.table_schemas.clone(),
459 max_sub_compaction: compact_task.max_sub_compaction,
460 compaction_group_version_id: compact_task.compaction_group_version_id,
461 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
462 }
463 }
464}
465
466impl From<&CompactTask> for PbCompactTask {
467 fn from(compact_task: &CompactTask) -> Self {
468 #[expect(deprecated)]
469 Self {
470 input_ssts: compact_task
471 .input_ssts
472 .iter()
473 .map(|input_level| input_level.into())
474 .collect_vec(),
475 splits: compact_task
476 .splits
477 .iter()
478 .map(|keyrange| PbKeyRange {
479 left: keyrange.left.to_vec(),
480 right: keyrange.right.to_vec(),
481 right_exclusive: keyrange.right_exclusive,
482 })
483 .collect_vec(),
484 sorted_output_ssts: compact_task
485 .sorted_output_ssts
486 .iter()
487 .map(|sst| sst.into())
488 .collect_vec(),
489 task_id: compact_task.task_id,
490 target_level: compact_task.target_level,
491 gc_delete_keys: compact_task.gc_delete_keys,
492 base_level: compact_task.base_level,
493 task_status: compact_task.task_status.into(),
494 compaction_group_id: compact_task.compaction_group_id,
495 existing_table_ids: compact_task.existing_table_ids.clone(),
496 compression_algorithm: compact_task.compression_algorithm,
497 target_file_size: compact_task.target_file_size,
498 compaction_filter_mask: compact_task.compaction_filter_mask,
499 table_options: compact_task.table_options.clone(),
500 current_epoch_time: compact_task.current_epoch_time,
501 target_sub_level_id: compact_task.target_sub_level_id,
502 task_type: compact_task.task_type.into(),
503 split_weight_by_vnode: compact_task.split_weight_by_vnode,
504 table_vnode_partition: compact_task.table_vnode_partition.clone(),
505 table_watermarks: compact_task
506 .pk_prefix_table_watermarks
507 .iter()
508 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
509 .chain(compact_task.value_table_watermarks.iter())
510 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
511 .collect(),
512 split_by_state_table: compact_task.split_by_state_table,
513 table_schemas: compact_task.table_schemas.clone(),
514 max_sub_compaction: compact_task.max_sub_compaction,
515 compaction_group_version_id: compact_task.compaction_group_version_id,
516 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
517 }
518 }
519}
520
521#[derive(Clone, PartialEq, Default)]
522pub struct ValidationTask {
523 pub sst_infos: Vec<SstableInfo>,
524 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
525}
526
527impl From<PbValidationTask> for ValidationTask {
528 fn from(pb_validation_task: PbValidationTask) -> Self {
529 Self {
530 sst_infos: pb_validation_task
531 .sst_infos
532 .into_iter()
533 .map(SstableInfo::from)
534 .collect_vec(),
535 sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
536 }
537 }
538}
539
540impl From<ValidationTask> for PbValidationTask {
541 fn from(validation_task: ValidationTask) -> Self {
542 Self {
543 sst_infos: validation_task
544 .sst_infos
545 .into_iter()
546 .map(|sst| sst.into())
547 .collect_vec(),
548 sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
549 }
550 }
551}
552
553impl ValidationTask {
554 pub fn estimated_encode_len(&self) -> usize {
555 self.sst_infos
556 .iter()
557 .map(|sst| sst.estimated_encode_len())
558 .sum::<usize>()
559 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
560 + size_of::<u64>()
561 }
562}
563
564#[derive(Clone, PartialEq, Default, Debug)]
565pub struct ReportTask {
566 pub table_stats_change: HashMap<TableId, PbTableStats>,
567 pub task_id: u64,
568 pub task_status: TaskStatus,
569 pub sorted_output_ssts: Vec<SstableInfo>,
570 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
571}
572
573impl From<PbReportTask> for ReportTask {
574 fn from(value: PbReportTask) -> Self {
575 Self {
576 table_stats_change: value.table_stats_change.clone(),
577 task_id: value.task_id,
578 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
579 sorted_output_ssts: value
580 .sorted_output_ssts
581 .into_iter()
582 .map(SstableInfo::from)
583 .collect_vec(),
584 object_timestamps: value.object_timestamps,
585 }
586 }
587}
588
589impl From<ReportTask> for PbReportTask {
590 fn from(value: ReportTask) -> Self {
591 Self {
592 table_stats_change: value.table_stats_change.clone(),
593 task_id: value.task_id,
594 task_status: value.task_status.into(),
595 sorted_output_ssts: value
596 .sorted_output_ssts
597 .into_iter()
598 .map(|sst| sst.into())
599 .collect_vec(),
600 object_timestamps: value.object_timestamps,
601 }
602 }
603}