1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use bergloom_core::compaction::{
21 Compaction, CompactionType, RewriteDataFilesCommitManagerRetryConfig,
22};
23use bergloom_core::config::CompactionConfigBuilder;
24use derive_builder::Builder;
25use iceberg::{Catalog, TableIdent};
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use parquet::basic::Compression;
28use parquet::file::properties::WriterProperties;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_connector::sink::iceberg::IcebergConfig;
31use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34
35use super::HummockResult;
36use crate::hummock::HummockError;
37use crate::monitor::CompactorMetrics;
38
39static BERGLOOM_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
40 Box::new(PrometheusMetricsRegistry::new(
41 GLOBAL_METRICS_REGISTRY.clone(),
42 ))
43});
44
45pub struct IcebergCompactorRunner {
46 pub task_id: u64,
47 pub catalog: Arc<dyn Catalog>,
48 pub table_ident: TableIdent,
49 pub iceberg_config: IcebergConfig,
50
51 config: IcebergCompactorRunnerConfig,
52 metrics: Arc<CompactorMetrics>,
53}
54
55pub fn default_writer_properties() -> WriterProperties {
56 WriterProperties::builder()
57 .set_compression(Compression::SNAPPY)
58 .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
59 .build()
60}
61
62#[derive(Builder, Debug, Clone)]
63pub struct IcebergCompactorRunnerConfig {
64 #[builder(default = "4")]
65 pub max_parallelism: u32,
66 #[builder(default = "1024 * 1024 * 1024")] pub min_size_per_partition: u64,
68 #[builder(default = "32")]
69 pub max_file_count_per_partition: u32,
70 #[builder(default = "1024 * 1024 * 1024")] pub target_file_size_bytes: u64,
72 #[builder(default = "false")]
73 pub enable_validate_compaction: bool,
74 #[builder(default = "1024")]
75 pub max_record_batch_rows: usize,
76 #[builder(default = "default_writer_properties()")]
77 pub write_parquet_properties: WriterProperties,
78}
79
80#[derive(Debug, Clone)]
81pub(crate) struct RunnerContext {
82 pub max_available_parallelism: u32,
83 pub running_task_parallelism: Arc<AtomicU32>,
84}
85
86impl RunnerContext {
87 pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
88 Self {
89 max_available_parallelism,
90 running_task_parallelism,
91 }
92 }
93
94 pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
95 (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
96 >= input_parallelism
97 }
98
99 pub fn incr_running_task_parallelism(&self, increment: u32) {
100 self.running_task_parallelism
101 .fetch_add(increment, Ordering::SeqCst);
102 }
103
104 pub fn decr_running_task_parallelism(&self, decrement: u32) {
105 self.running_task_parallelism
106 .fetch_sub(decrement, Ordering::SeqCst);
107 }
108}
109
110impl IcebergCompactorRunner {
111 pub async fn new(
112 iceberg_compaction_task: IcebergCompactionTask,
113 config: IcebergCompactorRunnerConfig,
114 metrics: Arc<CompactorMetrics>,
115 ) -> HummockResult<Self> {
116 let IcebergCompactionTask { task_id, props } = iceberg_compaction_task;
117 let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
118 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
119 let catalog = iceberg_config
120 .create_catalog()
121 .await
122 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
123 let table_ident = iceberg_config
124 .full_table_name()
125 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
126
127 Ok(Self {
128 task_id,
129 catalog,
130 table_ident,
131 iceberg_config,
132 config,
133 metrics,
134 })
135 }
136
137 pub fn calculate_task_parallelism_impl(
138 task_id_for_logging: u64,
139 table_ident_for_logging: &TableIdent,
140 task_statistics: &IcebergCompactionTaskStatistics,
141 max_parallelism: u32,
142 min_size_per_partition: u64,
143 max_file_count_per_partition: u32,
144 target_file_size_bytes: u64,
145 ) -> HummockResult<(u32, u32)> {
146 if max_parallelism == 0 {
147 return Err(HummockError::compaction_executor(
148 "Max parallelism cannot be 0".to_owned(),
149 ));
150 }
151
152 let total_file_size_for_partitioning = task_statistics.total_data_file_size
153 + task_statistics.total_pos_del_file_size
154 + task_statistics.total_eq_del_file_size;
155 if total_file_size_for_partitioning == 0 {
156 tracing::warn!(
159 task_id = task_id_for_logging,
160 table = ?table_ident_for_logging,
161 "Total data file size is 0, setting partition_by_size to 0."
162 );
163
164 return Err(HummockError::compaction_executor(
165 "No files to calculate_task_parallelism".to_owned(),
166 )); }
168
169 let partition_by_size = total_file_size_for_partitioning
170 .div_ceil(min_size_per_partition)
171 .max(1) as u32; let total_files_count_for_partitioning = task_statistics.total_data_file_count
174 + task_statistics.total_pos_del_file_count
175 + task_statistics.total_eq_del_file_count;
176
177 let partition_by_count = total_files_count_for_partitioning
178 .div_ceil(max_file_count_per_partition)
179 .max(1); let input_parallelism = partition_by_size
182 .max(partition_by_count)
183 .min(max_parallelism);
184
185 let mut output_parallelism = partition_by_size
189 .min(input_parallelism)
190 .min(max_parallelism);
191
192 if task_statistics.total_data_file_size > 0 && task_statistics.total_data_file_size < target_file_size_bytes
196 && output_parallelism > 1
197 {
198 tracing::debug!(
199 task_id = task_id_for_logging,
200 table = ?table_ident_for_logging,
201 total_data_file_size = task_statistics.total_data_file_size,
202 target_file_size_bytes = target_file_size_bytes,
203 original_output_parallelism = output_parallelism,
204 "Total data size is less than target file size, forcing output_parallelism to 1."
205 );
206 output_parallelism = 1;
207 }
208
209 tracing::debug!(
210 task_id = task_id_for_logging,
211 table = ?table_ident_for_logging,
212 stats = ?task_statistics,
213 config_max_parallelism = max_parallelism,
214 config_min_size_per_partition = min_size_per_partition,
215 config_max_file_count_per_partition = max_file_count_per_partition,
216 config_target_file_size_bytes = target_file_size_bytes,
217 calculated_partition_by_size = partition_by_size,
218 calculated_partition_by_count = partition_by_count,
219 final_input_parallelism = input_parallelism,
220 final_output_parallelism = output_parallelism,
221 "Calculated task parallelism"
222 );
223
224 Ok((input_parallelism, output_parallelism))
225 }
226
227 fn calculate_task_parallelism(
228 &self,
229 task_statistics: &IcebergCompactionTaskStatistics,
230 max_parallelism: u32,
231 min_size_per_partition: u64,
232 max_file_count_per_partition: u32,
233 target_file_size_bytes: u64,
234 ) -> HummockResult<(u32, u32)> {
235 if max_parallelism == 0 {
236 return Err(HummockError::compaction_executor(
237 "Max parallelism cannot be 0".to_owned(),
238 ));
239 }
240
241 Self::calculate_task_parallelism_impl(
242 self.task_id,
243 &self.table_ident,
244 task_statistics,
245 max_parallelism,
246 min_size_per_partition,
247 max_file_count_per_partition,
248 target_file_size_bytes,
249 )
250 }
251
252 pub async fn compact(
253 self,
254 context: RunnerContext,
255 shutdown_rx: Receiver<()>,
256 ) -> HummockResult<()> {
257 let task_id = self.task_id;
258 let now = std::time::Instant::now();
259
260 let compact = async move {
261 let retry_config = RewriteDataFilesCommitManagerRetryConfig::default();
262 let statistics = self
263 .analyze_task_statistics()
264 .await
265 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
266
267 let (input_parallelism, output_parallelism) = self
268 .calculate_task_parallelism(
269 &statistics,
270 self.config.max_parallelism,
271 self.config.min_size_per_partition,
272 self.config.max_file_count_per_partition,
273 self.config.target_file_size_bytes,
274 )
275 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
276
277 if !context.is_available_parallelism_sufficient(input_parallelism) {
278 tracing::warn!(
279 "Available parallelism is less than input parallelism {} task {} will not run",
280 input_parallelism,
281 task_id
282 );
283 return Err(HummockError::compaction_executor(
284 "Available parallelism is less than input parallelism",
285 ));
286 }
287
288 let compaction_config = Arc::new(
289 CompactionConfigBuilder::default()
290 .batch_parallelism(input_parallelism as usize)
291 .target_partitions(output_parallelism as usize)
292 .target_file_size(self.config.target_file_size_bytes)
293 .enable_validate_compaction(self.config.enable_validate_compaction)
294 .max_record_batch_rows(self.config.max_record_batch_rows)
295 .write_parquet_properties(self.config.write_parquet_properties.clone())
296 .build()
297 .unwrap_or_else(|e| {
298 panic!(
299 "Failed to build iceberg compaction write props: {:?}",
300 e.as_report()
301 );
302 }),
303 );
304
305 let compaction = Compaction::builder()
306 .with_catalog(self.catalog.clone())
307 .with_catalog_name(self.iceberg_config.catalog_name())
308 .with_compaction_type(CompactionType::Full)
309 .with_config(compaction_config)
310 .with_table_ident(self.table_ident.clone())
311 .with_executor_type(bergloom_core::executor::ExecutorType::DataFusion)
312 .with_registry(BERGLOOM_METRICS_REGISTRY.clone())
313 .with_retry_config(retry_config)
314 .build()
315 .await
316 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
317
318 context.incr_running_task_parallelism(input_parallelism);
319 self.metrics.compact_task_pending_num.inc();
320 self.metrics
321 .compact_task_pending_parallelism
322 .add(input_parallelism as _);
323
324 let _release_guard = scopeguard::guard(
325 (input_parallelism, context.clone(), self.metrics.clone()), |(val, ctx, metrics_guard)| {
327 ctx.decr_running_task_parallelism(val);
328 metrics_guard.compact_task_pending_num.dec();
329 metrics_guard.compact_task_pending_parallelism.sub(val as _);
330 },
331 );
332
333 tracing::info!(
334 "Iceberg compaction task {} {:?} started with input parallelism {} and output parallelism {}",
335 task_id,
336 format!(
337 "{:?}-{:?}",
338 self.iceberg_config.catalog_name(),
339 self.table_ident
340 ),
341 input_parallelism,
342 output_parallelism
343 );
344 compaction
345 .compact()
346 .await
347 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
348 Ok::<(), HummockError>(())
349 };
350
351 tokio::select! {
352 _ = shutdown_rx => {
353 tracing::info!("Iceberg compaction task {} cancelled", task_id);
354 }
355 result = compact => {
356 if let Err(e) = result {
357 tracing::warn!(
358 error = %e.as_report(),
359 "Compaction task {} failed with error",
360 task_id,
361 );
362 }
363 }
364 }
365
366 tracing::info!(
367 "Iceberg compaction task {} finished cost {} ms",
368 task_id,
369 now.elapsed().as_millis()
370 );
371
372 Ok(())
373 }
374
375 async fn analyze_task_statistics(&self) -> HummockResult<IcebergCompactionTaskStatistics> {
376 let table = self
377 .catalog
378 .load_table(&self.table_ident)
379 .await
380 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
381 let manifest_list = table
382 .metadata()
383 .current_snapshot()
384 .ok_or_else(|| HummockError::compaction_executor("Don't find current_snapshot"))?
385 .load_manifest_list(table.file_io(), table.metadata())
386 .await
387 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
388
389 let mut total_data_file_size: u64 = 0;
390 let mut total_data_file_count = 0;
391 let mut total_pos_del_file_size: u64 = 0;
392 let mut total_pos_del_file_count = 0;
393 let mut total_eq_del_file_size: u64 = 0;
394 let mut total_eq_del_file_count = 0;
395
396 for manifest_file in manifest_list.entries() {
397 let manifest = manifest_file
398 .load_manifest(table.file_io())
399 .await
400 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
401 let (entry, _) = manifest.into_parts();
402 for i in entry {
403 match i.content_type() {
404 iceberg::spec::DataContentType::Data => {
405 total_data_file_size += i.data_file().file_size_in_bytes();
406 total_data_file_count += 1;
407 }
408 iceberg::spec::DataContentType::EqualityDeletes => {
409 total_eq_del_file_size += i.data_file().file_size_in_bytes();
410 total_eq_del_file_count += 1;
411 }
412 iceberg::spec::DataContentType::PositionDeletes => {
413 total_pos_del_file_size += i.data_file().file_size_in_bytes();
414 total_pos_del_file_count += 1;
415 }
416 }
417 }
418 }
419
420 Ok(IcebergCompactionTaskStatistics {
421 total_data_file_size,
422 total_data_file_count: total_data_file_count as u32,
423 total_pos_del_file_size,
424 total_pos_del_file_count: total_pos_del_file_count as u32,
425 total_eq_del_file_size,
426 total_eq_del_file_count: total_eq_del_file_count as u32,
427 })
428 }
429}
430
431pub struct IcebergCompactionTaskStatistics {
432 pub total_data_file_size: u64,
433 pub total_data_file_count: u32,
434 pub total_pos_del_file_size: u64,
435 pub total_pos_del_file_count: u32,
436 pub total_eq_del_file_size: u64,
437 pub total_eq_del_file_count: u32,
438}
439
440impl Debug for IcebergCompactionTaskStatistics {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 f.debug_struct("IcebergCompactionTaskStatistics")
443 .field("total_data_file_size", &self.total_data_file_size)
444 .field("total_data_file_count", &self.total_data_file_count)
445 .field("total_pos_del_file_size", &self.total_pos_del_file_size)
446 .field("total_pos_del_file_count", &self.total_pos_del_file_count)
447 .field("total_eq_del_file_size", &self.total_eq_del_file_size)
448 .field("total_eq_del_file_count", &self.total_eq_del_file_count)
449 .finish()
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use iceberg::TableIdent;
456
457 use super::*;
458
459 fn default_test_config_params() -> IcebergCompactorRunnerConfig {
460 IcebergCompactorRunnerConfigBuilder::default()
461 .max_parallelism(4)
462 .min_size_per_partition(1024 * 1024 * 100) .max_file_count_per_partition(10)
464 .target_file_size_bytes(1024 * 1024 * 128) .build()
466 .unwrap()
467 }
468
469 #[test]
470 fn test_calculate_parallelism_no_files() {
471 let config = default_test_config_params();
472 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
473 let stats = IcebergCompactionTaskStatistics {
474 total_data_file_size: 0,
475 total_data_file_count: 0,
476 total_pos_del_file_size: 0,
477 total_pos_del_file_count: 0,
478 total_eq_del_file_size: 0,
479 total_eq_del_file_count: 0,
480 };
481 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
482 1,
483 &dummy_table_ident,
484 &stats,
485 config.max_parallelism,
486 config.min_size_per_partition,
487 config.max_file_count_per_partition,
488 config.target_file_size_bytes,
489 );
490
491 assert!(result.is_err());
492 if let Err(e) = result {
493 assert!(
494 e.to_string()
495 .contains("No files to calculate_task_parallelism")
496 );
497 }
498 }
499
500 #[test]
501 fn test_calculate_parallelism_size_dominant() {
502 let config = default_test_config_params();
503 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
504 let stats = IcebergCompactionTaskStatistics {
505 total_data_file_size: 1024 * 1024 * 500, total_data_file_count: 5, total_pos_del_file_size: 0,
508 total_pos_del_file_count: 0,
509 total_eq_del_file_size: 0,
510 total_eq_del_file_count: 0,
511 };
512 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
518 1,
519 &dummy_table_ident,
520 &stats,
521 config.max_parallelism,
522 config.min_size_per_partition,
523 config.max_file_count_per_partition,
524 config.target_file_size_bytes,
525 )
526 .unwrap();
527 assert_eq!(input_p, 4);
528 assert_eq!(output_p, 4);
529 }
530
531 #[test]
532 fn test_calculate_parallelism_count_dominant() {
533 let config = default_test_config_params();
534 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
535 let stats = IcebergCompactionTaskStatistics {
536 total_data_file_size: 1024 * 1024 * 50, total_data_file_count: 35, total_pos_del_file_size: 0,
539 total_pos_del_file_count: 0,
540 total_eq_del_file_size: 0,
541 total_eq_del_file_count: 0,
542 };
543 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
549 1,
550 &dummy_table_ident,
551 &stats,
552 config.max_parallelism,
553 config.min_size_per_partition,
554 config.max_file_count_per_partition,
555 config.target_file_size_bytes,
556 )
557 .unwrap();
558 assert_eq!(input_p, 4);
559 assert_eq!(output_p, 1);
560 }
561
562 #[test]
563 fn test_calculate_parallelism_max_parallelism_cap() {
564 let mut config = default_test_config_params();
565 config.max_parallelism = 2; let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
567 let stats = IcebergCompactionTaskStatistics {
568 total_data_file_size: 1024 * 1024 * 500, total_data_file_count: 35, total_pos_del_file_size: 0,
571 total_pos_del_file_count: 0,
572 total_eq_del_file_size: 0,
573 total_eq_del_file_count: 0,
574 };
575 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
581 1,
582 &dummy_table_ident,
583 &stats,
584 config.max_parallelism,
585 config.min_size_per_partition,
586 config.max_file_count_per_partition,
587 config.target_file_size_bytes,
588 )
589 .unwrap();
590 assert_eq!(input_p, 2);
591 assert_eq!(output_p, 2);
592 }
593
594 #[test]
595 fn test_calculate_parallelism_small_data_heuristic() {
596 let config = default_test_config_params(); let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
598 let stats_for_heuristic = IcebergCompactionTaskStatistics {
607 total_data_file_size: 1024 * 1024 * 60, total_data_file_count: 5, total_pos_del_file_size: 1024 * 1024 * 250, total_pos_del_file_count: 2,
611 total_eq_del_file_size: 0,
612 total_eq_del_file_count: 0,
613 };
614 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
624 1,
625 &dummy_table_ident,
626 &stats_for_heuristic,
627 config.max_parallelism,
628 config.min_size_per_partition,
629 config.max_file_count_per_partition,
630 config.target_file_size_bytes,
631 )
632 .unwrap();
633 assert_eq!(input_p, 4);
634 assert_eq!(output_p, 1);
635 }
636
637 #[test]
638 fn test_calculate_parallelism_all_empty_files() {
639 let config = default_test_config_params();
640 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
641 let stats = IcebergCompactionTaskStatistics {
642 total_data_file_size: 0,
643 total_data_file_count: 5, total_pos_del_file_size: 0,
645 total_pos_del_file_count: 0,
646 total_eq_del_file_size: 0,
647 total_eq_del_file_count: 0,
648 };
649 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
651 1,
652 &dummy_table_ident,
653 &stats,
654 config.max_parallelism,
655 config.min_size_per_partition,
656 config.max_file_count_per_partition,
657 config.target_file_size_bytes,
658 );
659 assert!(result.is_err());
660 if let Err(e) = result {
661 assert!(
662 e.to_string()
663 .contains("No files to calculate_task_parallelism")
664 );
665 }
666 }
667
668 #[test]
669 fn test_calculate_parallelism_max_parallelism_zero() {
670 let mut config = default_test_config_params();
671 config.max_parallelism = 0; let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
673 let stats = IcebergCompactionTaskStatistics {
674 total_data_file_size: 1024 * 1024 * 50, total_data_file_count: 5,
676 total_pos_del_file_size: 0,
677 total_pos_del_file_count: 0,
678 total_eq_del_file_size: 0,
679 total_eq_del_file_count: 0,
680 };
681 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
683 1,
684 &dummy_table_ident,
685 &stats,
686 config.max_parallelism,
687 config.min_size_per_partition,
688 config.max_file_count_per_partition,
689 config.target_file_size_bytes,
690 );
691 assert!(result.is_err());
692 if let Err(e) = result {
693 assert!(e.to_string().contains("Max parallelism cannot be 0"));
694 }
695 }
696
697 #[test]
698 fn test_calculate_parallelism_only_delete_files() {
699 let config = default_test_config_params();
700 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
701 let stats = IcebergCompactionTaskStatistics {
702 total_data_file_size: 0,
703 total_data_file_count: 0,
704 total_pos_del_file_size: 1024 * 1024 * 20, total_pos_del_file_count: 2,
706 total_eq_del_file_size: 0,
707 total_eq_del_file_count: 0,
708 };
709 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
718 1,
719 &dummy_table_ident,
720 &stats,
721 config.max_parallelism,
722 config.min_size_per_partition,
723 config.max_file_count_per_partition,
724 config.target_file_size_bytes,
725 )
726 .unwrap();
727 assert_eq!(input_p, 1);
728 assert_eq!(output_p, 1);
729 }
730
731 #[test]
732 fn test_calculate_parallelism_zero_file_count_non_zero_size() {
733 let config = default_test_config_params();
734 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
735 let stats = IcebergCompactionTaskStatistics {
736 total_data_file_size: 1024 * 1024 * 200, total_data_file_count: 0, total_pos_del_file_size: 0,
739 total_pos_del_file_count: 0,
740 total_eq_del_file_size: 0,
741 total_eq_del_file_count: 0,
742 };
743 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
751 1,
752 &dummy_table_ident,
753 &stats,
754 config.max_parallelism,
755 config.min_size_per_partition,
756 config.max_file_count_per_partition,
757 config.target_file_size_bytes,
758 )
759 .unwrap();
760 assert_eq!(input_p, 2);
761 assert_eq!(output_p, 2);
762 }
763}