1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26
27use crate::HummockSstableObjectId;
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33
34#[derive(Clone, PartialEq, Default, Debug)]
35pub struct CompactTask {
36 pub input_ssts: Vec<InputLevel>,
38 pub splits: Vec<KeyRange>,
41 pub sorted_output_ssts: Vec<SstableInfo>,
43 pub task_id: u64,
45 pub target_level: u32,
47 pub gc_delete_keys: bool,
48 pub base_level: u32,
50 pub task_status: PbTaskStatus,
51 pub compaction_group_id: u64,
53 pub compaction_group_version_id: u64,
55 pub existing_table_ids: Vec<TableId>,
57 pub compression_algorithm: u32,
58 pub target_file_size: u64,
59 pub compaction_filter_mask: u32,
60 pub table_options: BTreeMap<TableId, PbTableOption>,
61 pub current_epoch_time: u64,
62 pub target_sub_level_id: u64,
63 pub task_type: PbTaskType,
65 pub split_by_state_table: bool,
67 pub split_weight_by_vnode: u32,
70 pub table_vnode_partition: BTreeMap<TableId, u32>,
71 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
74
75 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
76
77 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
78
79 pub max_sub_compaction: u32,
80
81 pub max_kv_count_for_xor16: Option<u64>,
82}
83
84impl CompactTask {
85 pub fn estimated_encode_len(&self) -> usize {
86 self.input_ssts
87 .iter()
88 .map(|input_level| input_level.estimated_encode_len())
89 .sum::<usize>()
90 + self
91 .splits
92 .iter()
93 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
94 .sum::<usize>()
95 + size_of::<u64>()
96 + self
97 .sorted_output_ssts
98 .iter()
99 .map(|sst| sst.estimated_encode_len())
100 .sum::<usize>()
101 + size_of::<u64>()
102 + size_of::<u32>()
103 + size_of::<bool>()
104 + size_of::<u32>()
105 + size_of::<i32>()
106 + size_of::<u64>()
107 + self.existing_table_ids.len() * size_of::<u32>()
108 + size_of::<u32>()
109 + size_of::<u64>()
110 + size_of::<u32>()
111 + self.table_options.len() * size_of::<u64>()
112 + size_of::<u64>()
113 + size_of::<u64>()
114 + size_of::<i32>()
115 + size_of::<bool>()
116 + size_of::<u32>()
117 + self.table_vnode_partition.len() * size_of::<u64>()
118 + self
119 .pk_prefix_table_watermarks
120 .values()
121 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
122 .sum::<usize>()
123 + self
124 .non_pk_prefix_table_watermarks
125 .values()
126 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
127 .sum::<usize>()
128 }
129
130 pub fn is_trivial_move_task(&self) -> bool {
131 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
132 return false;
133 }
134
135 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
136 {
137 return false;
138 }
139
140 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
142 && self.input_ssts[0].level_idx > 0
143 {
144 return false;
145 }
146
147 if self.input_ssts[1].level_idx == self.target_level
148 && self.input_ssts[1].table_infos.is_empty()
149 {
150 return true;
151 }
152
153 false
154 }
155
156 pub fn is_trivial_reclaim(&self) -> bool {
157 if self.task_type == TaskType::VnodeWatermark {
159 return true;
160 }
161 let exist_table_ids =
162 HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
163 self.input_ssts.iter().all(|level| {
164 level.table_infos.iter().all(|sst| {
165 sst.table_ids
166 .iter()
167 .all(|table_id| !exist_table_ids.contains(table_id))
168 })
169 })
170 }
171}
172
173impl CompactTask {
174 pub fn contains_ttl(&self) -> bool {
176 self.table_options
177 .iter()
178 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
179 }
180
181 pub fn contains_range_tombstone(&self) -> bool {
183 self.input_ssts
184 .iter()
185 .flat_map(|level| level.table_infos.iter())
186 .any(|sst| sst.range_tombstone_count > 0)
187 }
188
189 pub fn contains_split_sst(&self) -> bool {
191 self.input_ssts
192 .iter()
193 .flat_map(|level| level.table_infos.iter())
194 .any(|sst| sst.sst_id.inner() != sst.object_id.inner())
195 }
196
197 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
198 self.input_ssts
199 .iter()
200 .flat_map(|level| level.table_infos.iter())
201 .flat_map(|sst| sst.table_ids.clone())
202 .sorted()
203 .dedup()
204 }
205
206 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
208 let existing_table_ids: HashSet<TableId> =
209 HashSet::from_iter(self.existing_table_ids.clone());
210 self.get_table_ids_from_input_ssts()
211 .filter(|table_id| existing_table_ids.contains(table_id))
212 .collect()
213 }
214
215 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
216 is_compaction_task_expired(
217 self.compaction_group_version_id,
218 compaction_group_version_id_expected,
219 )
220 }
221
222 pub fn should_use_block_based_filter(&self) -> bool {
225 let kv_count = self
226 .input_ssts
227 .iter()
228 .flat_map(|level| level.table_infos.iter())
229 .map(|sst| sst.total_key_count)
230 .sum::<u64>();
231
232 crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
233 }
234}
235
236pub fn is_compaction_task_expired(
237 compaction_group_version_id_in_task: u64,
238 compaction_group_version_id_expected: u64,
239) -> bool {
240 compaction_group_version_id_in_task != compaction_group_version_id_expected
241}
242
243impl From<PbCompactTask> for CompactTask {
244 fn from(pb_compact_task: PbCompactTask) -> Self {
245 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
246 .table_watermarks
247 .into_iter()
248 .map(|(table_id, pb_table_watermark)| {
249 let table_watermark = TableWatermarks::from(pb_table_watermark);
250 (table_id, table_watermark)
251 })
252 .partition(|(_table_id, table_watermarke)| {
253 matches!(
254 table_watermarke.watermark_type,
255 WatermarkSerdeType::PkPrefix
256 )
257 });
258
259 #[expect(deprecated)]
260 Self {
261 input_ssts: pb_compact_task
262 .input_ssts
263 .into_iter()
264 .map(InputLevel::from)
265 .collect_vec(),
266 splits: pb_compact_task
267 .splits
268 .into_iter()
269 .map(|pb_keyrange| KeyRange {
270 left: pb_keyrange.left.into(),
271 right: pb_keyrange.right.into(),
272 right_exclusive: pb_keyrange.right_exclusive,
273 })
274 .collect_vec(),
275 sorted_output_ssts: pb_compact_task
276 .sorted_output_ssts
277 .into_iter()
278 .map(SstableInfo::from)
279 .collect_vec(),
280 task_id: pb_compact_task.task_id,
281 target_level: pb_compact_task.target_level,
282 gc_delete_keys: pb_compact_task.gc_delete_keys,
283 base_level: pb_compact_task.base_level,
284 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
285 compaction_group_id: pb_compact_task.compaction_group_id,
286 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
287 compression_algorithm: pb_compact_task.compression_algorithm,
288 target_file_size: pb_compact_task.target_file_size,
289 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
290 table_options: pb_compact_task
291 .table_options
292 .iter()
293 .map(|(table_id, v)| (*table_id, *v))
294 .collect(),
295 current_epoch_time: pb_compact_task.current_epoch_time,
296 target_sub_level_id: pb_compact_task.target_sub_level_id,
297 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
298 split_by_state_table: pb_compact_task.split_by_state_table,
299 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
300 table_vnode_partition: pb_compact_task
301 .table_vnode_partition
302 .iter()
303 .map(|(table_id, v)| (*table_id, *v))
304 .collect(),
305 pk_prefix_table_watermarks,
306 non_pk_prefix_table_watermarks,
307 table_schemas: pb_compact_task
308 .table_schemas
309 .iter()
310 .map(|(table_id, v)| (*table_id, v.clone()))
311 .collect(),
312 max_sub_compaction: pb_compact_task.max_sub_compaction,
313 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
314 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
315 }
316 }
317}
318
319impl From<&PbCompactTask> for CompactTask {
320 fn from(pb_compact_task: &PbCompactTask) -> Self {
321 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = pb_compact_task
322 .table_watermarks
323 .iter()
324 .map(|(table_id, pb_table_watermark)| {
325 let table_watermark = TableWatermarks::from(pb_table_watermark);
326 (*table_id, table_watermark)
327 })
328 .partition(|(_table_id, table_watermarke)| {
329 matches!(
330 table_watermarke.watermark_type,
331 WatermarkSerdeType::PkPrefix
332 )
333 });
334
335 #[expect(deprecated)]
336 Self {
337 input_ssts: pb_compact_task
338 .input_ssts
339 .iter()
340 .map(InputLevel::from)
341 .collect_vec(),
342 splits: pb_compact_task
343 .splits
344 .iter()
345 .map(|pb_keyrange| KeyRange {
346 left: pb_keyrange.left.clone().into(),
347 right: pb_keyrange.right.clone().into(),
348 right_exclusive: pb_keyrange.right_exclusive,
349 })
350 .collect_vec(),
351 sorted_output_ssts: pb_compact_task
352 .sorted_output_ssts
353 .iter()
354 .map(SstableInfo::from)
355 .collect_vec(),
356 task_id: pb_compact_task.task_id,
357 target_level: pb_compact_task.target_level,
358 gc_delete_keys: pb_compact_task.gc_delete_keys,
359 base_level: pb_compact_task.base_level,
360 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
361 compaction_group_id: pb_compact_task.compaction_group_id,
362 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
363 compression_algorithm: pb_compact_task.compression_algorithm,
364 target_file_size: pb_compact_task.target_file_size,
365 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
366 table_options: pb_compact_task
367 .table_options
368 .iter()
369 .map(|(table_id, v)| (*table_id, *v))
370 .collect(),
371 current_epoch_time: pb_compact_task.current_epoch_time,
372 target_sub_level_id: pb_compact_task.target_sub_level_id,
373 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
374 split_by_state_table: pb_compact_task.split_by_state_table,
375 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
376 table_vnode_partition: pb_compact_task
377 .table_vnode_partition
378 .iter()
379 .map(|(table_id, v)| (*table_id, *v))
380 .collect(),
381 pk_prefix_table_watermarks,
382 non_pk_prefix_table_watermarks,
383 table_schemas: pb_compact_task
384 .table_schemas
385 .iter()
386 .map(|(table_id, v)| (*table_id, v.clone()))
387 .collect(),
388 max_sub_compaction: pb_compact_task.max_sub_compaction,
389 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
390 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
391 }
392 }
393}
394
395impl From<CompactTask> for PbCompactTask {
396 fn from(compact_task: CompactTask) -> Self {
397 #[expect(deprecated)]
398 Self {
399 input_ssts: compact_task
400 .input_ssts
401 .into_iter()
402 .map(|input_level| input_level.into())
403 .collect_vec(),
404 splits: compact_task
405 .splits
406 .into_iter()
407 .map(|keyrange| PbKeyRange {
408 left: keyrange.left.into(),
409 right: keyrange.right.into(),
410 right_exclusive: keyrange.right_exclusive,
411 })
412 .collect_vec(),
413 sorted_output_ssts: compact_task
414 .sorted_output_ssts
415 .into_iter()
416 .map(|sst| sst.into())
417 .collect_vec(),
418 task_id: compact_task.task_id,
419 target_level: compact_task.target_level,
420 gc_delete_keys: compact_task.gc_delete_keys,
421 base_level: compact_task.base_level,
422 task_status: compact_task.task_status.into(),
423 compaction_group_id: compact_task.compaction_group_id,
424 existing_table_ids: compact_task.existing_table_ids.clone(),
425 compression_algorithm: compact_task.compression_algorithm,
426 target_file_size: compact_task.target_file_size,
427 compaction_filter_mask: compact_task.compaction_filter_mask,
428 table_options: compact_task.table_options.clone(),
429 current_epoch_time: compact_task.current_epoch_time,
430 target_sub_level_id: compact_task.target_sub_level_id,
431 task_type: compact_task.task_type.into(),
432 split_weight_by_vnode: compact_task.split_weight_by_vnode,
433 table_vnode_partition: compact_task.table_vnode_partition.clone(),
434 table_watermarks: compact_task
435 .pk_prefix_table_watermarks
436 .into_iter()
437 .chain(compact_task.non_pk_prefix_table_watermarks)
438 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
439 .collect(),
440 split_by_state_table: compact_task.split_by_state_table,
441 table_schemas: compact_task.table_schemas.clone(),
442 max_sub_compaction: compact_task.max_sub_compaction,
443 compaction_group_version_id: compact_task.compaction_group_version_id,
444 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
445 }
446 }
447}
448
449impl From<&CompactTask> for PbCompactTask {
450 fn from(compact_task: &CompactTask) -> Self {
451 #[expect(deprecated)]
452 Self {
453 input_ssts: compact_task
454 .input_ssts
455 .iter()
456 .map(|input_level| input_level.into())
457 .collect_vec(),
458 splits: compact_task
459 .splits
460 .iter()
461 .map(|keyrange| PbKeyRange {
462 left: keyrange.left.to_vec(),
463 right: keyrange.right.to_vec(),
464 right_exclusive: keyrange.right_exclusive,
465 })
466 .collect_vec(),
467 sorted_output_ssts: compact_task
468 .sorted_output_ssts
469 .iter()
470 .map(|sst| sst.into())
471 .collect_vec(),
472 task_id: compact_task.task_id,
473 target_level: compact_task.target_level,
474 gc_delete_keys: compact_task.gc_delete_keys,
475 base_level: compact_task.base_level,
476 task_status: compact_task.task_status.into(),
477 compaction_group_id: compact_task.compaction_group_id,
478 existing_table_ids: compact_task.existing_table_ids.clone(),
479 compression_algorithm: compact_task.compression_algorithm,
480 target_file_size: compact_task.target_file_size,
481 compaction_filter_mask: compact_task.compaction_filter_mask,
482 table_options: compact_task.table_options.clone(),
483 current_epoch_time: compact_task.current_epoch_time,
484 target_sub_level_id: compact_task.target_sub_level_id,
485 task_type: compact_task.task_type.into(),
486 split_weight_by_vnode: compact_task.split_weight_by_vnode,
487 table_vnode_partition: compact_task.table_vnode_partition.clone(),
488 table_watermarks: compact_task
489 .pk_prefix_table_watermarks
490 .iter()
491 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
492 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
493 .collect(),
494 split_by_state_table: compact_task.split_by_state_table,
495 table_schemas: compact_task.table_schemas.clone(),
496 max_sub_compaction: compact_task.max_sub_compaction,
497 compaction_group_version_id: compact_task.compaction_group_version_id,
498 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
499 }
500 }
501}
502
503#[derive(Clone, PartialEq, Default)]
504pub struct ValidationTask {
505 pub sst_infos: Vec<SstableInfo>,
506 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, u32>,
507}
508
509impl From<PbValidationTask> for ValidationTask {
510 fn from(pb_validation_task: PbValidationTask) -> Self {
511 Self {
512 sst_infos: pb_validation_task
513 .sst_infos
514 .into_iter()
515 .map(SstableInfo::from)
516 .collect_vec(),
517 sst_id_to_worker_id: pb_validation_task
518 .sst_id_to_worker_id
519 .into_iter()
520 .map(|(object_id, worker_id)| (object_id.into(), worker_id))
521 .collect(),
522 }
523 }
524}
525
526impl From<ValidationTask> for PbValidationTask {
527 fn from(validation_task: ValidationTask) -> Self {
528 Self {
529 sst_infos: validation_task
530 .sst_infos
531 .into_iter()
532 .map(|sst| sst.into())
533 .collect_vec(),
534 sst_id_to_worker_id: validation_task
535 .sst_id_to_worker_id
536 .into_iter()
537 .map(|(object_id, worker_id)| (object_id.inner(), worker_id))
538 .collect(),
539 }
540 }
541}
542
543impl ValidationTask {
544 pub fn estimated_encode_len(&self) -> usize {
545 self.sst_infos
546 .iter()
547 .map(|sst| sst.estimated_encode_len())
548 .sum::<usize>()
549 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
550 + size_of::<u64>()
551 }
552}
553
554#[derive(Clone, PartialEq, Default, Debug)]
555pub struct ReportTask {
556 pub table_stats_change: HashMap<TableId, PbTableStats>,
557 pub task_id: u64,
558 pub task_status: TaskStatus,
559 pub sorted_output_ssts: Vec<SstableInfo>,
560 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
561}
562
563impl From<PbReportTask> for ReportTask {
564 fn from(value: PbReportTask) -> Self {
565 Self {
566 table_stats_change: value.table_stats_change.clone(),
567 task_id: value.task_id,
568 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
569 sorted_output_ssts: value
570 .sorted_output_ssts
571 .into_iter()
572 .map(SstableInfo::from)
573 .collect_vec(),
574 object_timestamps: value
575 .object_timestamps
576 .into_iter()
577 .map(|(object_id, timestamp)| (object_id.into(), timestamp))
578 .collect(),
579 }
580 }
581}
582
583impl From<ReportTask> for PbReportTask {
584 fn from(value: ReportTask) -> Self {
585 Self {
586 table_stats_change: value.table_stats_change.clone(),
587 task_id: value.task_id,
588 task_status: value.task_status.into(),
589 sorted_output_ssts: value
590 .sorted_output_ssts
591 .into_iter()
592 .map(|sst| sst.into())
593 .collect_vec(),
594 object_timestamps: value
595 .object_timestamps
596 .into_iter()
597 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
598 .collect(),
599 }
600 }
601}