risingwave_storage/hummock/
iceberg_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use derive_builder::Builder;
21use iceberg::{Catalog, TableIdent};
22use iceberg_compaction_core::compaction::{
23    Compaction, CompactionType, RewriteDataFilesCommitManagerRetryConfig,
24};
25use iceberg_compaction_core::config::CompactionConfigBuilder;
26use iceberg_compaction_core::executor::RewriteFilesStat;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use parquet::basic::Compression;
29use parquet::file::properties::WriterProperties;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_connector::sink::iceberg::IcebergConfig;
32use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
33use thiserror_ext::AsReport;
34use tokio::sync::oneshot::Receiver;
35
36use super::HummockResult;
37use crate::hummock::HummockError;
38use crate::monitor::CompactorMetrics;
39
40static BERGLOOM_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
41    Box::new(PrometheusMetricsRegistry::new(
42        GLOBAL_METRICS_REGISTRY.clone(),
43    ))
44});
45
46pub struct IcebergCompactorRunner {
47    pub task_id: u64,
48    pub catalog: Arc<dyn Catalog>,
49    pub table_ident: TableIdent,
50    pub iceberg_config: IcebergConfig,
51
52    config: IcebergCompactorRunnerConfig,
53    metrics: Arc<CompactorMetrics>,
54}
55
56pub fn default_writer_properties() -> WriterProperties {
57    WriterProperties::builder()
58        .set_compression(Compression::SNAPPY)
59        .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
60        .build()
61}
62
63#[derive(Builder, Debug, Clone)]
64pub struct IcebergCompactorRunnerConfig {
65    #[builder(default = "4")]
66    pub max_parallelism: u32,
67    #[builder(default = "1024 * 1024 * 1024")] // 1GB")]
68    pub min_size_per_partition: u64,
69    #[builder(default = "32")]
70    pub max_file_count_per_partition: u32,
71    #[builder(default = "1024 * 1024 * 1024")] // 1GB
72    pub target_file_size_bytes: u64,
73    #[builder(default = "false")]
74    pub enable_validate_compaction: bool,
75    #[builder(default = "1024")]
76    pub max_record_batch_rows: usize,
77    #[builder(default = "default_writer_properties()")]
78    pub write_parquet_properties: WriterProperties,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) struct RunnerContext {
83    pub max_available_parallelism: u32,
84    pub running_task_parallelism: Arc<AtomicU32>,
85}
86
87impl RunnerContext {
88    pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
89        Self {
90            max_available_parallelism,
91            running_task_parallelism,
92        }
93    }
94
95    pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
96        (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
97            >= input_parallelism
98    }
99
100    pub fn incr_running_task_parallelism(&self, increment: u32) {
101        self.running_task_parallelism
102            .fetch_add(increment, Ordering::SeqCst);
103    }
104
105    pub fn decr_running_task_parallelism(&self, decrement: u32) {
106        self.running_task_parallelism
107            .fetch_sub(decrement, Ordering::SeqCst);
108    }
109}
110
111impl IcebergCompactorRunner {
112    pub async fn new(
113        iceberg_compaction_task: IcebergCompactionTask,
114        config: IcebergCompactorRunnerConfig,
115        metrics: Arc<CompactorMetrics>,
116    ) -> HummockResult<Self> {
117        let IcebergCompactionTask { task_id, props } = iceberg_compaction_task;
118        let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
119            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
120        let catalog = iceberg_config
121            .create_catalog()
122            .await
123            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
124        let table_ident = iceberg_config
125            .full_table_name()
126            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
127
128        Ok(Self {
129            task_id,
130            catalog,
131            table_ident,
132            iceberg_config,
133            config,
134            metrics,
135        })
136    }
137
138    pub fn calculate_task_parallelism_impl(
139        task_id_for_logging: u64,
140        table_ident_for_logging: &TableIdent,
141        task_statistics: &IcebergCompactionTaskStatistics,
142        max_parallelism: u32,
143        min_size_per_partition: u64,
144        max_file_count_per_partition: u32,
145        target_file_size_bytes: u64,
146    ) -> HummockResult<(u32, u32)> {
147        if max_parallelism == 0 {
148            return Err(HummockError::compaction_executor(
149                "Max parallelism cannot be 0".to_owned(),
150            ));
151        }
152
153        let total_file_size_for_partitioning = task_statistics.total_data_file_size
154            + task_statistics.total_pos_del_file_size
155            + task_statistics.total_eq_del_file_size;
156        if total_file_size_for_partitioning == 0 {
157            // If the total data file size is 0, we cannot partition by size.
158            // This means there are no data files to compact.
159            tracing::warn!(
160                task_id = task_id_for_logging,
161                table = ?table_ident_for_logging,
162                "Total data file size is 0, setting partition_by_size to 0."
163            );
164
165            return Err(HummockError::compaction_executor(
166                "No files to calculate_task_parallelism".to_owned(),
167            )); // No files, so no partitions.
168        }
169
170        let partition_by_size = total_file_size_for_partitioning
171            .div_ceil(min_size_per_partition)
172            .max(1) as u32; // Ensure at least one partition.
173
174        let total_files_count_for_partitioning = task_statistics.total_data_file_count
175            + task_statistics.total_pos_del_file_count
176            + task_statistics.total_eq_del_file_count;
177
178        let partition_by_count = total_files_count_for_partitioning
179            .div_ceil(max_file_count_per_partition)
180            .max(1); // Ensure at least one partition.
181
182        let input_parallelism = partition_by_size
183            .max(partition_by_count)
184            .min(max_parallelism);
185
186        // `output_parallelism` should not exceed `input_parallelism`
187        // and should also not exceed max_parallelism.
188        // It's primarily driven by size to avoid small output files.
189        let mut output_parallelism = partition_by_size
190            .min(input_parallelism)
191            .min(max_parallelism);
192
193        // Heuristic: If the total task data size is very small (less than target_file_size_bytes),
194        // force output_parallelism to 1 to encourage merging into a single, larger output file.
195        if task_statistics.total_data_file_size > 0 // Only apply if there's data
196            && task_statistics.total_data_file_size < target_file_size_bytes
197            && output_parallelism > 1
198        {
199            tracing::debug!(
200                task_id = task_id_for_logging,
201                table = ?table_ident_for_logging,
202                total_data_file_size = task_statistics.total_data_file_size,
203                target_file_size_bytes = target_file_size_bytes,
204                original_output_parallelism = output_parallelism,
205                "Total data size is less than target file size, forcing output_parallelism to 1."
206            );
207            output_parallelism = 1;
208        }
209
210        tracing::debug!(
211            task_id = task_id_for_logging,
212            table = ?table_ident_for_logging,
213            stats = ?task_statistics,
214            config_max_parallelism = max_parallelism,
215            config_min_size_per_partition = min_size_per_partition,
216            config_max_file_count_per_partition = max_file_count_per_partition,
217            config_target_file_size_bytes = target_file_size_bytes,
218            calculated_partition_by_size = partition_by_size,
219            calculated_partition_by_count = partition_by_count,
220            final_input_parallelism = input_parallelism,
221            final_output_parallelism = output_parallelism,
222            "Calculated task parallelism"
223        );
224
225        Ok((input_parallelism, output_parallelism))
226    }
227
228    fn calculate_task_parallelism(
229        &self,
230        task_statistics: &IcebergCompactionTaskStatistics,
231        max_parallelism: u32,
232        min_size_per_partition: u64,
233        max_file_count_per_partition: u32,
234        target_file_size_bytes: u64,
235    ) -> HummockResult<(u32, u32)> {
236        if max_parallelism == 0 {
237            return Err(HummockError::compaction_executor(
238                "Max parallelism cannot be 0".to_owned(),
239            ));
240        }
241
242        Self::calculate_task_parallelism_impl(
243            self.task_id,
244            &self.table_ident,
245            task_statistics,
246            max_parallelism,
247            min_size_per_partition,
248            max_file_count_per_partition,
249            target_file_size_bytes,
250        )
251    }
252
253    pub async fn compact(
254        self,
255        context: RunnerContext,
256        shutdown_rx: Receiver<()>,
257    ) -> HummockResult<()> {
258        let task_id = self.task_id;
259        let now = std::time::Instant::now();
260
261        let compact = async move {
262            let retry_config = RewriteDataFilesCommitManagerRetryConfig::default();
263            let statistics = self
264                .analyze_task_statistics()
265                .await
266                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
267
268            let (input_parallelism, output_parallelism) = self
269                .calculate_task_parallelism(
270                    &statistics,
271                    self.config.max_parallelism,
272                    self.config.min_size_per_partition,
273                    self.config.max_file_count_per_partition,
274                    self.config.target_file_size_bytes,
275                )
276                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
277
278            if !context.is_available_parallelism_sufficient(input_parallelism) {
279                tracing::warn!(
280                    "Available parallelism is less than input parallelism {} task {} will not run",
281                    input_parallelism,
282                    task_id
283                );
284                return Err(HummockError::compaction_executor(
285                    "Available parallelism is less than input parallelism",
286                ));
287            }
288
289            let compaction_config = Arc::new(
290                CompactionConfigBuilder::default()
291                    .output_parallelism(output_parallelism as usize)
292                    .executor_parallelism(input_parallelism as usize)
293                    .target_file_size(self.config.target_file_size_bytes)
294                    .enable_validate_compaction(self.config.enable_validate_compaction)
295                    .max_record_batch_rows(self.config.max_record_batch_rows)
296                    .write_parquet_properties(self.config.write_parquet_properties.clone())
297                    .build()
298                    .unwrap_or_else(|e| {
299                        panic!(
300                            "Failed to build iceberg compaction write props: {:?}",
301                            e.as_report()
302                        );
303                    }),
304            );
305
306            tracing::info!(
307                task_id = task_id,
308                table = ?self.table_ident,
309                input_parallelism,
310                output_parallelism,
311                statistics = ?statistics,
312                compaction_config = ?compaction_config,
313                "Iceberg compaction task started",
314            );
315
316            let compaction = Compaction::builder()
317                .with_catalog(self.catalog.clone())
318                .with_catalog_name(self.iceberg_config.catalog_name())
319                .with_compaction_type(CompactionType::Full)
320                .with_config(compaction_config)
321                .with_table_ident(self.table_ident.clone())
322                .with_executor_type(iceberg_compaction_core::executor::ExecutorType::DataFusion)
323                .with_registry(BERGLOOM_METRICS_REGISTRY.clone())
324                .with_retry_config(retry_config)
325                .build()
326                .await
327                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
328
329            context.incr_running_task_parallelism(input_parallelism);
330            self.metrics.compact_task_pending_num.inc();
331            self.metrics
332                .compact_task_pending_parallelism
333                .add(input_parallelism as _);
334
335            let _release_guard = scopeguard::guard(
336                (input_parallelism, context.clone(), self.metrics.clone()), /* metrics.clone() if Arc */
337                |(val, ctx, metrics_guard)| {
338                    ctx.decr_running_task_parallelism(val);
339                    metrics_guard.compact_task_pending_num.dec();
340                    metrics_guard.compact_task_pending_parallelism.sub(val as _);
341                },
342            );
343
344            let stat = compaction
345                .compact()
346                .await
347                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
348            Ok::<RewriteFilesStat, HummockError>(stat)
349        };
350
351        tokio::select! {
352            _ = shutdown_rx => {
353                tracing::info!(task_id = task_id, "Iceberg compaction task cancelled");
354            }
355            stat = compact => {
356                match stat {
357                    Ok(stat) => {
358                        tracing::info!(
359                            task_id = task_id,
360                            elapsed_millis = now.elapsed().as_millis(),
361                            stat = ?stat,
362                            "Iceberg compaction task finished",
363                        );
364                    }
365
366                    Err(e) => {
367                        tracing::warn!(
368                            error = %e.as_report(),
369                            task_id = task_id,
370                            "Iceberg compaction task failed with error",
371                        );
372                    }
373                }
374            }
375        }
376
377        Ok(())
378    }
379
380    async fn analyze_task_statistics(&self) -> HummockResult<IcebergCompactionTaskStatistics> {
381        let table = self
382            .catalog
383            .load_table(&self.table_ident)
384            .await
385            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
386        let manifest_list = table
387            .metadata()
388            .current_snapshot()
389            .ok_or_else(|| HummockError::compaction_executor("Don't find current_snapshot"))?
390            .load_manifest_list(table.file_io(), table.metadata())
391            .await
392            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
393
394        let mut total_data_file_size: u64 = 0;
395        let mut total_data_file_count = 0;
396        let mut total_pos_del_file_size: u64 = 0;
397        let mut total_pos_del_file_count = 0;
398        let mut total_eq_del_file_size: u64 = 0;
399        let mut total_eq_del_file_count = 0;
400
401        for manifest_file in manifest_list.entries() {
402            let manifest = manifest_file
403                .load_manifest(table.file_io())
404                .await
405                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
406            let (entry, _) = manifest.into_parts();
407            for i in entry {
408                match i.content_type() {
409                    iceberg::spec::DataContentType::Data => {
410                        total_data_file_size += i.data_file().file_size_in_bytes();
411                        total_data_file_count += 1;
412                    }
413                    iceberg::spec::DataContentType::EqualityDeletes => {
414                        total_eq_del_file_size += i.data_file().file_size_in_bytes();
415                        total_eq_del_file_count += 1;
416                    }
417                    iceberg::spec::DataContentType::PositionDeletes => {
418                        total_pos_del_file_size += i.data_file().file_size_in_bytes();
419                        total_pos_del_file_count += 1;
420                    }
421                }
422            }
423        }
424
425        Ok(IcebergCompactionTaskStatistics {
426            total_data_file_size,
427            total_data_file_count: total_data_file_count as u32,
428            total_pos_del_file_size,
429            total_pos_del_file_count: total_pos_del_file_count as u32,
430            total_eq_del_file_size,
431            total_eq_del_file_count: total_eq_del_file_count as u32,
432        })
433    }
434}
435
436pub struct IcebergCompactionTaskStatistics {
437    pub total_data_file_size: u64,
438    pub total_data_file_count: u32,
439    pub total_pos_del_file_size: u64,
440    pub total_pos_del_file_count: u32,
441    pub total_eq_del_file_size: u64,
442    pub total_eq_del_file_count: u32,
443}
444
445impl Debug for IcebergCompactionTaskStatistics {
446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447        f.debug_struct("IcebergCompactionTaskStatistics")
448            .field("total_data_file_size", &self.total_data_file_size)
449            .field("total_data_file_count", &self.total_data_file_count)
450            .field("total_pos_del_file_size", &self.total_pos_del_file_size)
451            .field("total_pos_del_file_count", &self.total_pos_del_file_count)
452            .field("total_eq_del_file_size", &self.total_eq_del_file_size)
453            .field("total_eq_del_file_count", &self.total_eq_del_file_count)
454            .finish()
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use iceberg::TableIdent;
461
462    use super::*;
463
464    fn default_test_config_params() -> IcebergCompactorRunnerConfig {
465        IcebergCompactorRunnerConfigBuilder::default()
466            .max_parallelism(4)
467            .min_size_per_partition(1024 * 1024 * 100) // 100MB
468            .max_file_count_per_partition(10)
469            .target_file_size_bytes(1024 * 1024 * 128) // 128MB
470            .build()
471            .unwrap()
472    }
473
474    #[test]
475    fn test_calculate_parallelism_no_files() {
476        let config = default_test_config_params();
477        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
478        let stats = IcebergCompactionTaskStatistics {
479            total_data_file_size: 0,
480            total_data_file_count: 0,
481            total_pos_del_file_size: 0,
482            total_pos_del_file_count: 0,
483            total_eq_del_file_size: 0,
484            total_eq_del_file_count: 0,
485        };
486        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
487            1,
488            &dummy_table_ident,
489            &stats,
490            config.max_parallelism,
491            config.min_size_per_partition,
492            config.max_file_count_per_partition,
493            config.target_file_size_bytes,
494        );
495
496        assert!(result.is_err());
497        if let Err(e) = result {
498            assert!(
499                e.to_string()
500                    .contains("No files to calculate_task_parallelism")
501            );
502        }
503    }
504
505    #[test]
506    fn test_calculate_parallelism_size_dominant() {
507        let config = default_test_config_params();
508        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
509        let stats = IcebergCompactionTaskStatistics {
510            total_data_file_size: 1024 * 1024 * 500, // 500MB
511            total_data_file_count: 5,                // Low count
512            total_pos_del_file_size: 0,
513            total_pos_del_file_count: 0,
514            total_eq_del_file_size: 0,
515            total_eq_del_file_count: 0,
516        };
517        // partition_by_size = 500MB / 100MB = 5
518        // partition_by_count = 5 / 10 = 1 (div_ceil)
519        // initial_input = max(5,1) = 5
520        // input = min(5, 4_max_p) = 4
521        // output = min(partition_by_size=5, input=4, max_p=4) = 4
522        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
523            1,
524            &dummy_table_ident,
525            &stats,
526            config.max_parallelism,
527            config.min_size_per_partition,
528            config.max_file_count_per_partition,
529            config.target_file_size_bytes,
530        )
531        .unwrap();
532        assert_eq!(input_p, 4);
533        assert_eq!(output_p, 4);
534    }
535
536    #[test]
537    fn test_calculate_parallelism_count_dominant() {
538        let config = default_test_config_params();
539        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
540        let stats = IcebergCompactionTaskStatistics {
541            total_data_file_size: 1024 * 1024 * 50, // 50MB (low size)
542            total_data_file_count: 35,              // High count
543            total_pos_del_file_size: 0,
544            total_pos_del_file_count: 0,
545            total_eq_del_file_size: 0,
546            total_eq_del_file_count: 0,
547        };
548        // partition_by_size = 50MB / 100MB = 1 (div_ceil)
549        // partition_by_count = 35 / 10 = 4 (div_ceil)
550        // initial_input = max(1,4) = 4
551        // input = min(4, 4_max_p) = 4
552        // output = min(partition_by_size=1, input=4, max_p=4) = 1
553        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
554            1,
555            &dummy_table_ident,
556            &stats,
557            config.max_parallelism,
558            config.min_size_per_partition,
559            config.max_file_count_per_partition,
560            config.target_file_size_bytes,
561        )
562        .unwrap();
563        assert_eq!(input_p, 4);
564        assert_eq!(output_p, 1);
565    }
566
567    #[test]
568    fn test_calculate_parallelism_max_parallelism_cap() {
569        let mut config = default_test_config_params();
570        config.max_parallelism = 2; // Lower max_parallelism
571        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
572        let stats = IcebergCompactionTaskStatistics {
573            total_data_file_size: 1024 * 1024 * 500, // 500MB
574            total_data_file_count: 35,               // High count
575            total_pos_del_file_size: 0,
576            total_pos_del_file_count: 0,
577            total_eq_del_file_size: 0,
578            total_eq_del_file_count: 0,
579        };
580        // partition_by_size = 500MB / 100MB = 5
581        // partition_by_count = 35 / 10 = 4
582        // initial_input = max(5,4) = 5
583        // input = min(5, 2_max_p) = 2
584        // output = min(partition_by_size=5, input=2, max_p=2) = 2
585        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
586            1,
587            &dummy_table_ident,
588            &stats,
589            config.max_parallelism,
590            config.min_size_per_partition,
591            config.max_file_count_per_partition,
592            config.target_file_size_bytes,
593        )
594        .unwrap();
595        assert_eq!(input_p, 2);
596        assert_eq!(output_p, 2);
597    }
598
599    #[test]
600    fn test_calculate_parallelism_small_data_heuristic() {
601        let config = default_test_config_params(); // target_file_size_bytes = 128MB
602        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
603        // partition_by_size = 60MB / 100MB = 1
604        // partition_by_count = 15 / 10 = 2
605        // initial_input = max(1,2) = 2
606        // input = min(2, 4_max_p) = 2
607        // initial_output = min(partition_by_size=1, input=2, max_p=4) = 1
608        // Heuristic: total_data_file_size (60MB) < target_file_size_bytes (128MB)
609        // Since initial_output was 1, it remains 1.
610        // Let's make partition_by_size larger to test heuristic properly
611        let stats_for_heuristic = IcebergCompactionTaskStatistics {
612            total_data_file_size: 1024 * 1024 * 60,     // 60MB
613            total_data_file_count: 5,                   // Low count, so size dominates for input
614            total_pos_del_file_size: 1024 * 1024 * 250, // Makes total size 310MB
615            total_pos_del_file_count: 2,
616            total_eq_del_file_size: 0,
617            total_eq_del_file_count: 0,
618        };
619        // total_file_size_for_partitioning = 310MB
620        // partition_by_size = 310MB / 100MB = 4 (div_ceil)
621        // total_files_count_for_partitioning = 5 + 2 = 7
622        // partition_by_count = 7 / 10 = 1
623        // initial_input = max(4,1) = 4
624        // input = min(4, 4_max_p) = 4
625        // initial_output = min(partition_by_size=4, input=4, max_p=4) = 4
626        // Heuristic: total_data_file_size (60MB) < target_file_size_bytes (128MB)
627        // AND initial_output (4) > 1. So output becomes 1.
628        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
629            1,
630            &dummy_table_ident,
631            &stats_for_heuristic,
632            config.max_parallelism,
633            config.min_size_per_partition,
634            config.max_file_count_per_partition,
635            config.target_file_size_bytes,
636        )
637        .unwrap();
638        assert_eq!(input_p, 4);
639        assert_eq!(output_p, 1);
640    }
641
642    #[test]
643    fn test_calculate_parallelism_all_empty_files() {
644        let config = default_test_config_params();
645        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
646        let stats = IcebergCompactionTaskStatistics {
647            total_data_file_size: 0,
648            total_data_file_count: 5, // 5 empty data files
649            total_pos_del_file_size: 0,
650            total_pos_del_file_count: 0,
651            total_eq_del_file_size: 0,
652            total_eq_del_file_count: 0,
653        };
654        // total_file_size_for_partitioning = 0, should return Err.
655        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
656            1,
657            &dummy_table_ident,
658            &stats,
659            config.max_parallelism,
660            config.min_size_per_partition,
661            config.max_file_count_per_partition,
662            config.target_file_size_bytes,
663        );
664        assert!(result.is_err());
665        if let Err(e) = result {
666            assert!(
667                e.to_string()
668                    .contains("No files to calculate_task_parallelism")
669            );
670        }
671    }
672
673    #[test]
674    fn test_calculate_parallelism_max_parallelism_zero() {
675        let mut config = default_test_config_params();
676        config.max_parallelism = 0; // max_parallelism is 0
677        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
678        let stats = IcebergCompactionTaskStatistics {
679            total_data_file_size: 1024 * 1024 * 50, // 50MB
680            total_data_file_count: 5,
681            total_pos_del_file_size: 0,
682            total_pos_del_file_count: 0,
683            total_eq_del_file_size: 0,
684            total_eq_del_file_count: 0,
685        };
686        // max_parallelism = 0, should return Err.
687        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
688            1,
689            &dummy_table_ident,
690            &stats,
691            config.max_parallelism,
692            config.min_size_per_partition,
693            config.max_file_count_per_partition,
694            config.target_file_size_bytes,
695        );
696        assert!(result.is_err());
697        if let Err(e) = result {
698            assert!(e.to_string().contains("Max parallelism cannot be 0"));
699        }
700    }
701
702    #[test]
703    fn test_calculate_parallelism_only_delete_files() {
704        let config = default_test_config_params();
705        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
706        let stats = IcebergCompactionTaskStatistics {
707            total_data_file_size: 0,
708            total_data_file_count: 0,
709            total_pos_del_file_size: 1024 * 1024 * 20, // 20MB of pos-delete files
710            total_pos_del_file_count: 2,
711            total_eq_del_file_size: 0,
712            total_eq_del_file_count: 0,
713        };
714        // total_files_count_for_partitioning = 2
715        // total_file_size_for_partitioning = 20MB
716        // partition_by_size = 20MB / 100MB = 1
717        // partition_by_count = 2 / 10 = 1
718        // initial_input = max(1,1) = 1
719        // input = min(1, 4_max_p) = 1
720        // initial_output = min(partition_by_size=1, input=1, max_p=4) = 1
721        // Heuristic: total_data_file_size (0) is not > 0.
722        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
723            1,
724            &dummy_table_ident,
725            &stats,
726            config.max_parallelism,
727            config.min_size_per_partition,
728            config.max_file_count_per_partition,
729            config.target_file_size_bytes,
730        )
731        .unwrap();
732        assert_eq!(input_p, 1);
733        assert_eq!(output_p, 1);
734    }
735
736    #[test]
737    fn test_calculate_parallelism_zero_file_count_non_zero_size() {
738        let config = default_test_config_params();
739        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
740        let stats = IcebergCompactionTaskStatistics {
741            total_data_file_size: 1024 * 1024 * 200, // 200MB
742            total_data_file_count: 0,                // No data files by count
743            total_pos_del_file_size: 0,
744            total_pos_del_file_count: 0,
745            total_eq_del_file_size: 0,
746            total_eq_del_file_count: 0,
747        };
748        // total_file_size_for_partitioning = 200MB
749        // partition_by_size = (200MB / 100MB).ceil().max(1) = 2
750        // total_files_count_for_partitioning = 0
751        // partition_by_count = (0 / 10).ceil().max(1) = 1
752        // input_parallelism = max(2, 1).min(4) = 2
753        // output_parallelism = partition_by_size.min(input_parallelism).min(max_parallelism) = 2.min(2).min(4) = 2
754        // Heuristic: total_data_file_size (200MB) not < target_file_size_bytes (128MB)
755        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
756            1,
757            &dummy_table_ident,
758            &stats,
759            config.max_parallelism,
760            config.min_size_per_partition,
761            config.max_file_count_per_partition,
762            config.target_file_size_bytes,
763        )
764        .unwrap();
765        assert_eq!(input_p, 2);
766        assert_eq!(output_p, 2);
767    }
768}