1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23 LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24 PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37 pub input_ssts: Vec<InputLevel>,
39 pub splits: Vec<KeyRange>,
42 pub sorted_output_ssts: Vec<SstableInfo>,
44 pub task_id: u64,
46 pub target_level: u32,
48 pub gc_delete_keys: bool,
49 pub base_level: u32,
51 pub task_status: PbTaskStatus,
52 pub compaction_group_id: CompactionGroupId,
54 pub compaction_group_version_id: u64,
56 pub existing_table_ids: Vec<TableId>,
58 pub compression_algorithm: u32,
59 pub target_file_size: u64,
60 pub compaction_filter_mask: u32,
61 pub table_options: BTreeMap<TableId, PbTableOption>,
62 pub current_epoch_time: u64,
63 pub target_sub_level_id: u64,
64 pub task_type: PbTaskType,
66 pub split_by_state_table: bool,
68 pub split_weight_by_vnode: u32,
71 pub table_vnode_partition: BTreeMap<TableId, u32>,
72 pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
75
76 pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77 pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
78
79 pub table_schemas: BTreeMap<TableId, PbTableSchema>,
80
81 pub max_sub_compaction: u32,
82
83 pub max_kv_count_for_xor16: Option<u64>,
84
85 pub max_vnode_key_range_bytes: Option<u64>,
86}
87
88impl CompactTask {
89 pub fn estimated_encode_len(&self) -> usize {
90 self.input_ssts
91 .iter()
92 .map(|input_level| input_level.estimated_encode_len())
93 .sum::<usize>()
94 + self
95 .splits
96 .iter()
97 .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
98 .sum::<usize>()
99 + size_of::<u64>()
100 + self
101 .sorted_output_ssts
102 .iter()
103 .map(|sst| sst.estimated_encode_len())
104 .sum::<usize>()
105 + size_of::<u64>()
106 + size_of::<u32>()
107 + size_of::<bool>()
108 + size_of::<u32>()
109 + size_of::<i32>()
110 + size_of::<u64>()
111 + self.existing_table_ids.len() * size_of::<u32>()
112 + size_of::<u32>()
113 + size_of::<u64>()
114 + size_of::<u32>()
115 + self.table_options.len() * size_of::<u64>()
116 + size_of::<u64>()
117 + size_of::<u64>()
118 + size_of::<i32>()
119 + size_of::<bool>()
120 + size_of::<u32>()
121 + self.table_vnode_partition.len() * size_of::<u64>()
122 + self
123 .pk_prefix_table_watermarks
124 .values()
125 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
126 .sum::<usize>()
127 + self
128 .non_pk_prefix_table_watermarks
129 .values()
130 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
131 .sum::<usize>()
132 + self
133 .max_vnode_key_range_bytes
134 .map(|_| size_of::<u32>())
135 .unwrap_or_default()
136 + self
137 .value_table_watermarks
138 .values()
139 .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
140 .sum::<usize>()
141 }
142
143 pub fn is_trivial_move_task(&self) -> bool {
144 if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
145 return false;
146 }
147
148 if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
149 {
150 return false;
151 }
152
153 if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
155 && self.input_ssts[0].level_idx > 0
156 {
157 return false;
158 }
159
160 if self.input_ssts[1].level_idx == self.target_level
161 && self.input_ssts[1].table_infos.is_empty()
162 {
163 return true;
164 }
165
166 false
167 }
168
169 pub fn is_trivial_reclaim(&self) -> bool {
170 if self.task_type == TaskType::VnodeWatermark {
172 return true;
173 }
174 let exist_table_ids =
175 HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
176 self.input_ssts.iter().all(|level| {
177 level.table_infos.iter().all(|sst| {
178 sst.table_ids
179 .iter()
180 .all(|table_id| !exist_table_ids.contains(table_id))
181 })
182 })
183 }
184}
185
186impl CompactTask {
187 pub fn contains_ttl(&self) -> bool {
189 self.table_options
190 .iter()
191 .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
192 }
193
194 pub fn contains_range_tombstone(&self) -> bool {
196 self.input_ssts
197 .iter()
198 .flat_map(|level| level.table_infos.iter())
199 .any(|sst| sst.range_tombstone_count > 0)
200 }
201
202 pub fn contains_split_sst(&self) -> bool {
204 self.input_ssts
205 .iter()
206 .flat_map(|level| level.table_infos.iter())
207 .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
208 }
209
210 pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
211 self.input_ssts
212 .iter()
213 .flat_map(|level| level.table_infos.iter())
214 .flat_map(|sst| sst.table_ids.clone())
215 .sorted()
216 .dedup()
217 }
218
219 pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
221 let existing_table_ids: HashSet<TableId> =
222 HashSet::from_iter(self.existing_table_ids.clone());
223 self.get_table_ids_from_input_ssts()
224 .filter(|table_id| existing_table_ids.contains(table_id))
225 .collect()
226 }
227
228 pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
229 is_compaction_task_expired(
230 self.compaction_group_version_id,
231 compaction_group_version_id_expected,
232 )
233 }
234
235 pub fn should_use_block_based_filter(&self) -> bool {
238 let kv_count = self
239 .input_ssts
240 .iter()
241 .flat_map(|level| level.table_infos.iter())
242 .map(|sst| sst.total_key_count)
243 .sum::<u64>();
244
245 crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
246 }
247
248 pub fn effective_max_vnode_key_range_bytes(&self) -> Option<usize> {
252 let limit = self.max_vnode_key_range_bytes.filter(|&v| v > 0)? as usize;
253 (self.build_compact_table_ids().len() == 1).then_some(limit)
254 }
255}
256
257fn split_watermark_serde_types(
258 pb_compact_task: &PbCompactTask,
259) -> (
260 BTreeMap<TableId, TableWatermarks>,
261 BTreeMap<TableId, TableWatermarks>,
262 BTreeMap<TableId, TableWatermarks>,
263) {
264 let mut pk_prefix_table_watermarks = BTreeMap::default();
265 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
266 let mut value_table_watermarks = BTreeMap::default();
267 for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
268 let watermark = TableWatermarks::from(pbwatermark);
269 match watermark.watermark_type {
270 WatermarkSerdeType::PkPrefix => {
271 pk_prefix_table_watermarks.insert(*table_id, watermark);
272 }
273 WatermarkSerdeType::NonPkPrefix => {
274 non_pk_prefix_table_watermarks.insert(*table_id, watermark);
275 }
276 WatermarkSerdeType::Value => {
277 value_table_watermarks.insert(*table_id, watermark);
278 }
279 }
280 }
281 (
282 pk_prefix_table_watermarks,
283 non_pk_prefix_table_watermarks,
284 value_table_watermarks,
285 )
286}
287
288pub fn is_compaction_task_expired(
289 compaction_group_version_id_in_task: u64,
290 compaction_group_version_id_expected: u64,
291) -> bool {
292 compaction_group_version_id_in_task != compaction_group_version_id_expected
293}
294
295impl From<PbCompactTask> for CompactTask {
296 fn from(pb_compact_task: PbCompactTask) -> Self {
297 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
298 split_watermark_serde_types(&pb_compact_task);
299 #[expect(deprecated)]
300 Self {
301 input_ssts: pb_compact_task
302 .input_ssts
303 .into_iter()
304 .map(InputLevel::from)
305 .collect_vec(),
306 splits: pb_compact_task
307 .splits
308 .into_iter()
309 .map(|pb_keyrange| KeyRange {
310 left: pb_keyrange.left.into(),
311 right: pb_keyrange.right.into(),
312 right_exclusive: pb_keyrange.right_exclusive,
313 })
314 .collect_vec(),
315 sorted_output_ssts: pb_compact_task
316 .sorted_output_ssts
317 .into_iter()
318 .map(SstableInfo::from)
319 .collect_vec(),
320 task_id: pb_compact_task.task_id,
321 target_level: pb_compact_task.target_level,
322 gc_delete_keys: pb_compact_task.gc_delete_keys,
323 base_level: pb_compact_task.base_level,
324 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
325 compaction_group_id: pb_compact_task.compaction_group_id,
326 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
327 compression_algorithm: pb_compact_task.compression_algorithm,
328 target_file_size: pb_compact_task.target_file_size,
329 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
330 table_options: pb_compact_task
331 .table_options
332 .iter()
333 .map(|(table_id, v)| (*table_id, *v))
334 .collect(),
335 current_epoch_time: pb_compact_task.current_epoch_time,
336 target_sub_level_id: pb_compact_task.target_sub_level_id,
337 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
338 split_by_state_table: pb_compact_task.split_by_state_table,
339 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
340 table_vnode_partition: pb_compact_task
341 .table_vnode_partition
342 .iter()
343 .map(|(table_id, v)| (*table_id, *v))
344 .collect(),
345 pk_prefix_table_watermarks,
346 non_pk_prefix_table_watermarks,
347 value_table_watermarks,
348 table_schemas: pb_compact_task
349 .table_schemas
350 .iter()
351 .map(|(table_id, v)| (*table_id, v.clone()))
352 .collect(),
353 max_sub_compaction: pb_compact_task.max_sub_compaction,
354 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
355 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
356 max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
357 }
358 }
359}
360
361impl From<&PbCompactTask> for CompactTask {
362 fn from(pb_compact_task: &PbCompactTask) -> Self {
363 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
364 split_watermark_serde_types(pb_compact_task);
365 #[expect(deprecated)]
366 Self {
367 input_ssts: pb_compact_task
368 .input_ssts
369 .iter()
370 .map(InputLevel::from)
371 .collect_vec(),
372 splits: pb_compact_task
373 .splits
374 .iter()
375 .map(|pb_keyrange| KeyRange {
376 left: pb_keyrange.left.clone().into(),
377 right: pb_keyrange.right.clone().into(),
378 right_exclusive: pb_keyrange.right_exclusive,
379 })
380 .collect_vec(),
381 sorted_output_ssts: pb_compact_task
382 .sorted_output_ssts
383 .iter()
384 .map(SstableInfo::from)
385 .collect_vec(),
386 task_id: pb_compact_task.task_id,
387 target_level: pb_compact_task.target_level,
388 gc_delete_keys: pb_compact_task.gc_delete_keys,
389 base_level: pb_compact_task.base_level,
390 task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
391 compaction_group_id: pb_compact_task.compaction_group_id,
392 existing_table_ids: pb_compact_task.existing_table_ids.clone(),
393 compression_algorithm: pb_compact_task.compression_algorithm,
394 target_file_size: pb_compact_task.target_file_size,
395 compaction_filter_mask: pb_compact_task.compaction_filter_mask,
396 table_options: pb_compact_task
397 .table_options
398 .iter()
399 .map(|(table_id, v)| (*table_id, *v))
400 .collect(),
401 current_epoch_time: pb_compact_task.current_epoch_time,
402 target_sub_level_id: pb_compact_task.target_sub_level_id,
403 task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
404 split_by_state_table: pb_compact_task.split_by_state_table,
405 split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
406 table_vnode_partition: pb_compact_task
407 .table_vnode_partition
408 .iter()
409 .map(|(table_id, v)| (*table_id, *v))
410 .collect(),
411 pk_prefix_table_watermarks,
412 non_pk_prefix_table_watermarks,
413 value_table_watermarks,
414 table_schemas: pb_compact_task
415 .table_schemas
416 .iter()
417 .map(|(table_id, v)| (*table_id, v.clone()))
418 .collect(),
419 max_sub_compaction: pb_compact_task.max_sub_compaction,
420 compaction_group_version_id: pb_compact_task.compaction_group_version_id,
421 max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
422 max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
423 }
424 }
425}
426
427impl From<CompactTask> for PbCompactTask {
428 fn from(compact_task: CompactTask) -> Self {
429 #[expect(deprecated)]
430 Self {
431 input_ssts: compact_task
432 .input_ssts
433 .into_iter()
434 .map(|input_level| input_level.into())
435 .collect_vec(),
436 splits: compact_task
437 .splits
438 .into_iter()
439 .map(|keyrange| PbKeyRange {
440 left: keyrange.left.into(),
441 right: keyrange.right.into(),
442 right_exclusive: keyrange.right_exclusive,
443 })
444 .collect_vec(),
445 sorted_output_ssts: compact_task
446 .sorted_output_ssts
447 .into_iter()
448 .map(|sst| sst.into())
449 .collect_vec(),
450 task_id: compact_task.task_id,
451 target_level: compact_task.target_level,
452 gc_delete_keys: compact_task.gc_delete_keys,
453 base_level: compact_task.base_level,
454 task_status: compact_task.task_status.into(),
455 compaction_group_id: compact_task.compaction_group_id,
456 existing_table_ids: compact_task.existing_table_ids.clone(),
457 compression_algorithm: compact_task.compression_algorithm,
458 target_file_size: compact_task.target_file_size,
459 compaction_filter_mask: compact_task.compaction_filter_mask,
460 table_options: compact_task.table_options.clone(),
461 current_epoch_time: compact_task.current_epoch_time,
462 target_sub_level_id: compact_task.target_sub_level_id,
463 task_type: compact_task.task_type.into(),
464 split_weight_by_vnode: compact_task.split_weight_by_vnode,
465 table_vnode_partition: compact_task.table_vnode_partition.clone(),
466 table_watermarks: compact_task
467 .pk_prefix_table_watermarks
468 .into_iter()
469 .chain(compact_task.non_pk_prefix_table_watermarks)
470 .chain(compact_task.value_table_watermarks)
471 .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
472 .collect(),
473 split_by_state_table: compact_task.split_by_state_table,
474 table_schemas: compact_task.table_schemas.clone(),
475 max_sub_compaction: compact_task.max_sub_compaction,
476 compaction_group_version_id: compact_task.compaction_group_version_id,
477 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
478 max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
479 }
480 }
481}
482
483impl From<&CompactTask> for PbCompactTask {
484 fn from(compact_task: &CompactTask) -> Self {
485 #[expect(deprecated)]
486 Self {
487 input_ssts: compact_task
488 .input_ssts
489 .iter()
490 .map(|input_level| input_level.into())
491 .collect_vec(),
492 splits: compact_task
493 .splits
494 .iter()
495 .map(|keyrange| PbKeyRange {
496 left: keyrange.left.to_vec(),
497 right: keyrange.right.to_vec(),
498 right_exclusive: keyrange.right_exclusive,
499 })
500 .collect_vec(),
501 sorted_output_ssts: compact_task
502 .sorted_output_ssts
503 .iter()
504 .map(|sst| sst.into())
505 .collect_vec(),
506 task_id: compact_task.task_id,
507 target_level: compact_task.target_level,
508 gc_delete_keys: compact_task.gc_delete_keys,
509 base_level: compact_task.base_level,
510 task_status: compact_task.task_status.into(),
511 compaction_group_id: compact_task.compaction_group_id,
512 existing_table_ids: compact_task.existing_table_ids.clone(),
513 compression_algorithm: compact_task.compression_algorithm,
514 target_file_size: compact_task.target_file_size,
515 compaction_filter_mask: compact_task.compaction_filter_mask,
516 table_options: compact_task.table_options.clone(),
517 current_epoch_time: compact_task.current_epoch_time,
518 target_sub_level_id: compact_task.target_sub_level_id,
519 task_type: compact_task.task_type.into(),
520 split_weight_by_vnode: compact_task.split_weight_by_vnode,
521 table_vnode_partition: compact_task.table_vnode_partition.clone(),
522 table_watermarks: compact_task
523 .pk_prefix_table_watermarks
524 .iter()
525 .chain(compact_task.non_pk_prefix_table_watermarks.iter())
526 .chain(compact_task.value_table_watermarks.iter())
527 .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
528 .collect(),
529 split_by_state_table: compact_task.split_by_state_table,
530 table_schemas: compact_task.table_schemas.clone(),
531 max_sub_compaction: compact_task.max_sub_compaction,
532 compaction_group_version_id: compact_task.compaction_group_version_id,
533 max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
534 max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
535 }
536 }
537}
538
539#[derive(Clone, PartialEq, Default)]
540pub struct ValidationTask {
541 pub sst_infos: Vec<SstableInfo>,
542 pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
543}
544
545impl From<PbValidationTask> for ValidationTask {
546 fn from(pb_validation_task: PbValidationTask) -> Self {
547 Self {
548 sst_infos: pb_validation_task
549 .sst_infos
550 .into_iter()
551 .map(SstableInfo::from)
552 .collect_vec(),
553 sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
554 }
555 }
556}
557
558impl From<ValidationTask> for PbValidationTask {
559 fn from(validation_task: ValidationTask) -> Self {
560 Self {
561 sst_infos: validation_task
562 .sst_infos
563 .into_iter()
564 .map(|sst| sst.into())
565 .collect_vec(),
566 sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
567 }
568 }
569}
570
571impl ValidationTask {
572 pub fn estimated_encode_len(&self) -> usize {
573 self.sst_infos
574 .iter()
575 .map(|sst| sst.estimated_encode_len())
576 .sum::<usize>()
577 + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
578 + size_of::<u64>()
579 }
580}
581
582#[derive(Clone, PartialEq, Default, Debug)]
583pub struct ReportTask {
584 pub table_stats_change: HashMap<TableId, PbTableStats>,
585 pub task_id: u64,
586 pub task_status: TaskStatus,
587 pub sorted_output_ssts: Vec<SstableInfo>,
588 pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
589}
590
591impl From<PbReportTask> for ReportTask {
592 fn from(value: PbReportTask) -> Self {
593 Self {
594 table_stats_change: value.table_stats_change.clone(),
595 task_id: value.task_id,
596 task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
597 sorted_output_ssts: value
598 .sorted_output_ssts
599 .into_iter()
600 .map(SstableInfo::from)
601 .collect_vec(),
602 object_timestamps: value.object_timestamps,
603 }
604 }
605}
606
607impl From<ReportTask> for PbReportTask {
608 fn from(value: ReportTask) -> Self {
609 Self {
610 table_stats_change: value.table_stats_change.clone(),
611 task_id: value.task_id,
612 task_status: value.task_status.into(),
613 sorted_output_ssts: value
614 .sorted_output_ssts
615 .into_iter()
616 .map(|sst| sst.into())
617 .collect_vec(),
618 object_timestamps: value.object_timestamps,
619 }
620 }
621}