risingwave_storage/hummock/
iceberg_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use bergloom_core::compaction::{
21    Compaction, CompactionType, RewriteDataFilesCommitManagerRetryConfig,
22};
23use bergloom_core::config::CompactionConfigBuilder;
24use derive_builder::Builder;
25use iceberg::{Catalog, TableIdent};
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use parquet::basic::Compression;
28use parquet::file::properties::WriterProperties;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_connector::sink::iceberg::IcebergConfig;
31use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34
35use super::HummockResult;
36use crate::hummock::HummockError;
37use crate::monitor::CompactorMetrics;
38
39static BERGLOOM_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
40    Box::new(PrometheusMetricsRegistry::new(
41        GLOBAL_METRICS_REGISTRY.clone(),
42    ))
43});
44
45pub struct IcebergCompactorRunner {
46    pub task_id: u64,
47    pub catalog: Arc<dyn Catalog>,
48    pub table_ident: TableIdent,
49    pub iceberg_config: IcebergConfig,
50
51    config: IcebergCompactorRunnerConfig,
52    metrics: Arc<CompactorMetrics>,
53}
54
55pub fn default_writer_properties() -> WriterProperties {
56    WriterProperties::builder()
57        .set_compression(Compression::SNAPPY)
58        .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
59        .build()
60}
61
62#[derive(Builder, Debug, Clone)]
63pub struct IcebergCompactorRunnerConfig {
64    #[builder(default = "4")]
65    pub max_parallelism: u32,
66    #[builder(default = "1024 * 1024 * 1024")] // 1GB")]
67    pub min_size_per_partition: u64,
68    #[builder(default = "32")]
69    pub max_file_count_per_partition: u32,
70    #[builder(default = "1024 * 1024 * 1024")] // 1GB
71    pub target_file_size_bytes: u64,
72    #[builder(default = "false")]
73    pub enable_validate_compaction: bool,
74    #[builder(default = "1024")]
75    pub max_record_batch_rows: usize,
76    #[builder(default = "default_writer_properties()")]
77    pub write_parquet_properties: WriterProperties,
78}
79
80#[derive(Debug, Clone)]
81pub(crate) struct RunnerContext {
82    pub max_available_parallelism: u32,
83    pub running_task_parallelism: Arc<AtomicU32>,
84}
85
86impl RunnerContext {
87    pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
88        Self {
89            max_available_parallelism,
90            running_task_parallelism,
91        }
92    }
93
94    pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
95        (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
96            >= input_parallelism
97    }
98
99    pub fn incr_running_task_parallelism(&self, increment: u32) {
100        self.running_task_parallelism
101            .fetch_add(increment, Ordering::SeqCst);
102    }
103
104    pub fn decr_running_task_parallelism(&self, decrement: u32) {
105        self.running_task_parallelism
106            .fetch_sub(decrement, Ordering::SeqCst);
107    }
108}
109
110impl IcebergCompactorRunner {
111    pub async fn new(
112        iceberg_compaction_task: IcebergCompactionTask,
113        config: IcebergCompactorRunnerConfig,
114        metrics: Arc<CompactorMetrics>,
115    ) -> HummockResult<Self> {
116        let IcebergCompactionTask { task_id, props } = iceberg_compaction_task;
117        let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
118            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
119        let catalog = iceberg_config
120            .create_catalog()
121            .await
122            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
123        let table_ident = iceberg_config
124            .full_table_name()
125            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
126
127        Ok(Self {
128            task_id,
129            catalog,
130            table_ident,
131            iceberg_config,
132            config,
133            metrics,
134        })
135    }
136
137    pub fn calculate_task_parallelism_impl(
138        task_id_for_logging: u64,
139        table_ident_for_logging: &TableIdent,
140        task_statistics: &IcebergCompactionTaskStatistics,
141        max_parallelism: u32,
142        min_size_per_partition: u64,
143        max_file_count_per_partition: u32,
144        target_file_size_bytes: u64,
145    ) -> HummockResult<(u32, u32)> {
146        if max_parallelism == 0 {
147            return Err(HummockError::compaction_executor(
148                "Max parallelism cannot be 0".to_owned(),
149            ));
150        }
151
152        let total_file_size_for_partitioning = task_statistics.total_data_file_size
153            + task_statistics.total_pos_del_file_size
154            + task_statistics.total_eq_del_file_size;
155        if total_file_size_for_partitioning == 0 {
156            // If the total data file size is 0, we cannot partition by size.
157            // This means there are no data files to compact.
158            tracing::warn!(
159                task_id = task_id_for_logging,
160                table = ?table_ident_for_logging,
161                "Total data file size is 0, setting partition_by_size to 0."
162            );
163
164            return Err(HummockError::compaction_executor(
165                "No files to calculate_task_parallelism".to_owned(),
166            )); // No files, so no partitions.
167        }
168
169        let partition_by_size = total_file_size_for_partitioning
170            .div_ceil(min_size_per_partition)
171            .max(1) as u32; // Ensure at least one partition.
172
173        let total_files_count_for_partitioning = task_statistics.total_data_file_count
174            + task_statistics.total_pos_del_file_count
175            + task_statistics.total_eq_del_file_count;
176
177        let partition_by_count = total_files_count_for_partitioning
178            .div_ceil(max_file_count_per_partition)
179            .max(1); // Ensure at least one partition.
180
181        let input_parallelism = partition_by_size
182            .max(partition_by_count)
183            .min(max_parallelism);
184
185        // `output_parallelism` should not exceed `input_parallelism`
186        // and should also not exceed max_parallelism.
187        // It's primarily driven by size to avoid small output files.
188        let mut output_parallelism = partition_by_size
189            .min(input_parallelism)
190            .min(max_parallelism);
191
192        // Heuristic: If the total task data size is very small (less than target_file_size_bytes),
193        // force output_parallelism to 1 to encourage merging into a single, larger output file.
194        if task_statistics.total_data_file_size > 0 // Only apply if there's data
195            && task_statistics.total_data_file_size < target_file_size_bytes
196            && output_parallelism > 1
197        {
198            tracing::debug!(
199                task_id = task_id_for_logging,
200                table = ?table_ident_for_logging,
201                total_data_file_size = task_statistics.total_data_file_size,
202                target_file_size_bytes = target_file_size_bytes,
203                original_output_parallelism = output_parallelism,
204                "Total data size is less than target file size, forcing output_parallelism to 1."
205            );
206            output_parallelism = 1;
207        }
208
209        tracing::debug!(
210            task_id = task_id_for_logging,
211            table = ?table_ident_for_logging,
212            stats = ?task_statistics,
213            config_max_parallelism = max_parallelism,
214            config_min_size_per_partition = min_size_per_partition,
215            config_max_file_count_per_partition = max_file_count_per_partition,
216            config_target_file_size_bytes = target_file_size_bytes,
217            calculated_partition_by_size = partition_by_size,
218            calculated_partition_by_count = partition_by_count,
219            final_input_parallelism = input_parallelism,
220            final_output_parallelism = output_parallelism,
221            "Calculated task parallelism"
222        );
223
224        Ok((input_parallelism, output_parallelism))
225    }
226
227    fn calculate_task_parallelism(
228        &self,
229        task_statistics: &IcebergCompactionTaskStatistics,
230        max_parallelism: u32,
231        min_size_per_partition: u64,
232        max_file_count_per_partition: u32,
233        target_file_size_bytes: u64,
234    ) -> HummockResult<(u32, u32)> {
235        if max_parallelism == 0 {
236            return Err(HummockError::compaction_executor(
237                "Max parallelism cannot be 0".to_owned(),
238            ));
239        }
240
241        Self::calculate_task_parallelism_impl(
242            self.task_id,
243            &self.table_ident,
244            task_statistics,
245            max_parallelism,
246            min_size_per_partition,
247            max_file_count_per_partition,
248            target_file_size_bytes,
249        )
250    }
251
252    pub async fn compact(
253        self,
254        context: RunnerContext,
255        shutdown_rx: Receiver<()>,
256    ) -> HummockResult<()> {
257        let task_id = self.task_id;
258        let now = std::time::Instant::now();
259
260        let compact = async move {
261            let retry_config = RewriteDataFilesCommitManagerRetryConfig::default();
262            let statistics = self
263                .analyze_task_statistics()
264                .await
265                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
266
267            let (input_parallelism, output_parallelism) = self
268                .calculate_task_parallelism(
269                    &statistics,
270                    self.config.max_parallelism,
271                    self.config.min_size_per_partition,
272                    self.config.max_file_count_per_partition,
273                    self.config.target_file_size_bytes,
274                )
275                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
276
277            if !context.is_available_parallelism_sufficient(input_parallelism) {
278                tracing::warn!(
279                    "Available parallelism is less than input parallelism {} task {} will not run",
280                    input_parallelism,
281                    task_id
282                );
283                return Err(HummockError::compaction_executor(
284                    "Available parallelism is less than input parallelism",
285                ));
286            }
287
288            let compaction_config = Arc::new(
289                CompactionConfigBuilder::default()
290                    .batch_parallelism(input_parallelism as usize)
291                    .target_partitions(output_parallelism as usize)
292                    .target_file_size(self.config.target_file_size_bytes)
293                    .enable_validate_compaction(self.config.enable_validate_compaction)
294                    .max_record_batch_rows(self.config.max_record_batch_rows)
295                    .write_parquet_properties(self.config.write_parquet_properties.clone())
296                    .build()
297                    .unwrap_or_else(|e| {
298                        panic!(
299                            "Failed to build iceberg compaction write props: {:?}",
300                            e.as_report()
301                        );
302                    }),
303            );
304
305            let compaction = Compaction::builder()
306                .with_catalog(self.catalog.clone())
307                .with_catalog_name(self.iceberg_config.catalog_name())
308                .with_compaction_type(CompactionType::Full)
309                .with_config(compaction_config)
310                .with_table_ident(self.table_ident.clone())
311                .with_executor_type(bergloom_core::executor::ExecutorType::DataFusion)
312                .with_registry(BERGLOOM_METRICS_REGISTRY.clone())
313                .with_retry_config(retry_config)
314                .build()
315                .await
316                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
317
318            context.incr_running_task_parallelism(input_parallelism);
319            self.metrics.compact_task_pending_num.inc();
320            self.metrics
321                .compact_task_pending_parallelism
322                .add(input_parallelism as _);
323
324            let _release_guard = scopeguard::guard(
325                (input_parallelism, context.clone(), self.metrics.clone()), /* metrics.clone() if Arc */
326                |(val, ctx, metrics_guard)| {
327                    ctx.decr_running_task_parallelism(val);
328                    metrics_guard.compact_task_pending_num.dec();
329                    metrics_guard.compact_task_pending_parallelism.sub(val as _);
330                },
331            );
332
333            tracing::info!(
334                "Iceberg compaction task {} {:?} started with input parallelism {} and output parallelism {}",
335                task_id,
336                format!(
337                    "{:?}-{:?}",
338                    self.iceberg_config.catalog_name(),
339                    self.table_ident
340                ),
341                input_parallelism,
342                output_parallelism
343            );
344            compaction
345                .compact()
346                .await
347                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
348            Ok::<(), HummockError>(())
349        };
350
351        tokio::select! {
352            _ = shutdown_rx => {
353                tracing::info!("Iceberg compaction task {} cancelled", task_id);
354            }
355            result = compact => {
356                if let Err(e) = result {
357                    tracing::warn!(
358                        error = %e.as_report(),
359                        "Compaction task {} failed with error",
360                        task_id,
361                    );
362                }
363            }
364        }
365
366        tracing::info!(
367            "Iceberg compaction task {} finished cost {} ms",
368            task_id,
369            now.elapsed().as_millis()
370        );
371
372        Ok(())
373    }
374
375    async fn analyze_task_statistics(&self) -> HummockResult<IcebergCompactionTaskStatistics> {
376        let table = self
377            .catalog
378            .load_table(&self.table_ident)
379            .await
380            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
381        let manifest_list = table
382            .metadata()
383            .current_snapshot()
384            .ok_or_else(|| HummockError::compaction_executor("Don't find current_snapshot"))?
385            .load_manifest_list(table.file_io(), table.metadata())
386            .await
387            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
388
389        let mut total_data_file_size: u64 = 0;
390        let mut total_data_file_count = 0;
391        let mut total_pos_del_file_size: u64 = 0;
392        let mut total_pos_del_file_count = 0;
393        let mut total_eq_del_file_size: u64 = 0;
394        let mut total_eq_del_file_count = 0;
395
396        for manifest_file in manifest_list.entries() {
397            let manifest = manifest_file
398                .load_manifest(table.file_io())
399                .await
400                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
401            let (entry, _) = manifest.into_parts();
402            for i in entry {
403                match i.content_type() {
404                    iceberg::spec::DataContentType::Data => {
405                        total_data_file_size += i.data_file().file_size_in_bytes();
406                        total_data_file_count += 1;
407                    }
408                    iceberg::spec::DataContentType::EqualityDeletes => {
409                        total_eq_del_file_size += i.data_file().file_size_in_bytes();
410                        total_eq_del_file_count += 1;
411                    }
412                    iceberg::spec::DataContentType::PositionDeletes => {
413                        total_pos_del_file_size += i.data_file().file_size_in_bytes();
414                        total_pos_del_file_count += 1;
415                    }
416                }
417            }
418        }
419
420        Ok(IcebergCompactionTaskStatistics {
421            total_data_file_size,
422            total_data_file_count: total_data_file_count as u32,
423            total_pos_del_file_size,
424            total_pos_del_file_count: total_pos_del_file_count as u32,
425            total_eq_del_file_size,
426            total_eq_del_file_count: total_eq_del_file_count as u32,
427        })
428    }
429}
430
431pub struct IcebergCompactionTaskStatistics {
432    pub total_data_file_size: u64,
433    pub total_data_file_count: u32,
434    pub total_pos_del_file_size: u64,
435    pub total_pos_del_file_count: u32,
436    pub total_eq_del_file_size: u64,
437    pub total_eq_del_file_count: u32,
438}
439
440impl Debug for IcebergCompactionTaskStatistics {
441    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442        f.debug_struct("IcebergCompactionTaskStatistics")
443            .field("total_data_file_size", &self.total_data_file_size)
444            .field("total_data_file_count", &self.total_data_file_count)
445            .field("total_pos_del_file_size", &self.total_pos_del_file_size)
446            .field("total_pos_del_file_count", &self.total_pos_del_file_count)
447            .field("total_eq_del_file_size", &self.total_eq_del_file_size)
448            .field("total_eq_del_file_count", &self.total_eq_del_file_count)
449            .finish()
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use iceberg::TableIdent;
456
457    use super::*;
458
459    fn default_test_config_params() -> IcebergCompactorRunnerConfig {
460        IcebergCompactorRunnerConfigBuilder::default()
461            .max_parallelism(4)
462            .min_size_per_partition(1024 * 1024 * 100) // 100MB
463            .max_file_count_per_partition(10)
464            .target_file_size_bytes(1024 * 1024 * 128) // 128MB
465            .build()
466            .unwrap()
467    }
468
469    #[test]
470    fn test_calculate_parallelism_no_files() {
471        let config = default_test_config_params();
472        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
473        let stats = IcebergCompactionTaskStatistics {
474            total_data_file_size: 0,
475            total_data_file_count: 0,
476            total_pos_del_file_size: 0,
477            total_pos_del_file_count: 0,
478            total_eq_del_file_size: 0,
479            total_eq_del_file_count: 0,
480        };
481        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
482            1,
483            &dummy_table_ident,
484            &stats,
485            config.max_parallelism,
486            config.min_size_per_partition,
487            config.max_file_count_per_partition,
488            config.target_file_size_bytes,
489        );
490
491        assert!(result.is_err());
492        if let Err(e) = result {
493            assert!(
494                e.to_string()
495                    .contains("No files to calculate_task_parallelism")
496            );
497        }
498    }
499
500    #[test]
501    fn test_calculate_parallelism_size_dominant() {
502        let config = default_test_config_params();
503        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
504        let stats = IcebergCompactionTaskStatistics {
505            total_data_file_size: 1024 * 1024 * 500, // 500MB
506            total_data_file_count: 5,                // Low count
507            total_pos_del_file_size: 0,
508            total_pos_del_file_count: 0,
509            total_eq_del_file_size: 0,
510            total_eq_del_file_count: 0,
511        };
512        // partition_by_size = 500MB / 100MB = 5
513        // partition_by_count = 5 / 10 = 1 (div_ceil)
514        // initial_input = max(5,1) = 5
515        // input = min(5, 4_max_p) = 4
516        // output = min(partition_by_size=5, input=4, max_p=4) = 4
517        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
518            1,
519            &dummy_table_ident,
520            &stats,
521            config.max_parallelism,
522            config.min_size_per_partition,
523            config.max_file_count_per_partition,
524            config.target_file_size_bytes,
525        )
526        .unwrap();
527        assert_eq!(input_p, 4);
528        assert_eq!(output_p, 4);
529    }
530
531    #[test]
532    fn test_calculate_parallelism_count_dominant() {
533        let config = default_test_config_params();
534        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
535        let stats = IcebergCompactionTaskStatistics {
536            total_data_file_size: 1024 * 1024 * 50, // 50MB (low size)
537            total_data_file_count: 35,              // High count
538            total_pos_del_file_size: 0,
539            total_pos_del_file_count: 0,
540            total_eq_del_file_size: 0,
541            total_eq_del_file_count: 0,
542        };
543        // partition_by_size = 50MB / 100MB = 1 (div_ceil)
544        // partition_by_count = 35 / 10 = 4 (div_ceil)
545        // initial_input = max(1,4) = 4
546        // input = min(4, 4_max_p) = 4
547        // output = min(partition_by_size=1, input=4, max_p=4) = 1
548        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
549            1,
550            &dummy_table_ident,
551            &stats,
552            config.max_parallelism,
553            config.min_size_per_partition,
554            config.max_file_count_per_partition,
555            config.target_file_size_bytes,
556        )
557        .unwrap();
558        assert_eq!(input_p, 4);
559        assert_eq!(output_p, 1);
560    }
561
562    #[test]
563    fn test_calculate_parallelism_max_parallelism_cap() {
564        let mut config = default_test_config_params();
565        config.max_parallelism = 2; // Lower max_parallelism
566        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
567        let stats = IcebergCompactionTaskStatistics {
568            total_data_file_size: 1024 * 1024 * 500, // 500MB
569            total_data_file_count: 35,               // High count
570            total_pos_del_file_size: 0,
571            total_pos_del_file_count: 0,
572            total_eq_del_file_size: 0,
573            total_eq_del_file_count: 0,
574        };
575        // partition_by_size = 500MB / 100MB = 5
576        // partition_by_count = 35 / 10 = 4
577        // initial_input = max(5,4) = 5
578        // input = min(5, 2_max_p) = 2
579        // output = min(partition_by_size=5, input=2, max_p=2) = 2
580        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
581            1,
582            &dummy_table_ident,
583            &stats,
584            config.max_parallelism,
585            config.min_size_per_partition,
586            config.max_file_count_per_partition,
587            config.target_file_size_bytes,
588        )
589        .unwrap();
590        assert_eq!(input_p, 2);
591        assert_eq!(output_p, 2);
592    }
593
594    #[test]
595    fn test_calculate_parallelism_small_data_heuristic() {
596        let config = default_test_config_params(); // target_file_size_bytes = 128MB
597        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
598        // partition_by_size = 60MB / 100MB = 1
599        // partition_by_count = 15 / 10 = 2
600        // initial_input = max(1,2) = 2
601        // input = min(2, 4_max_p) = 2
602        // initial_output = min(partition_by_size=1, input=2, max_p=4) = 1
603        // Heuristic: total_data_file_size (60MB) < target_file_size_bytes (128MB)
604        // Since initial_output was 1, it remains 1.
605        // Let's make partition_by_size larger to test heuristic properly
606        let stats_for_heuristic = IcebergCompactionTaskStatistics {
607            total_data_file_size: 1024 * 1024 * 60,     // 60MB
608            total_data_file_count: 5,                   // Low count, so size dominates for input
609            total_pos_del_file_size: 1024 * 1024 * 250, // Makes total size 310MB
610            total_pos_del_file_count: 2,
611            total_eq_del_file_size: 0,
612            total_eq_del_file_count: 0,
613        };
614        // total_file_size_for_partitioning = 310MB
615        // partition_by_size = 310MB / 100MB = 4 (div_ceil)
616        // total_files_count_for_partitioning = 5 + 2 = 7
617        // partition_by_count = 7 / 10 = 1
618        // initial_input = max(4,1) = 4
619        // input = min(4, 4_max_p) = 4
620        // initial_output = min(partition_by_size=4, input=4, max_p=4) = 4
621        // Heuristic: total_data_file_size (60MB) < target_file_size_bytes (128MB)
622        // AND initial_output (4) > 1. So output becomes 1.
623        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
624            1,
625            &dummy_table_ident,
626            &stats_for_heuristic,
627            config.max_parallelism,
628            config.min_size_per_partition,
629            config.max_file_count_per_partition,
630            config.target_file_size_bytes,
631        )
632        .unwrap();
633        assert_eq!(input_p, 4);
634        assert_eq!(output_p, 1);
635    }
636
637    #[test]
638    fn test_calculate_parallelism_all_empty_files() {
639        let config = default_test_config_params();
640        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
641        let stats = IcebergCompactionTaskStatistics {
642            total_data_file_size: 0,
643            total_data_file_count: 5, // 5 empty data files
644            total_pos_del_file_size: 0,
645            total_pos_del_file_count: 0,
646            total_eq_del_file_size: 0,
647            total_eq_del_file_count: 0,
648        };
649        // total_file_size_for_partitioning = 0, should return Err.
650        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
651            1,
652            &dummy_table_ident,
653            &stats,
654            config.max_parallelism,
655            config.min_size_per_partition,
656            config.max_file_count_per_partition,
657            config.target_file_size_bytes,
658        );
659        assert!(result.is_err());
660        if let Err(e) = result {
661            assert!(
662                e.to_string()
663                    .contains("No files to calculate_task_parallelism")
664            );
665        }
666    }
667
668    #[test]
669    fn test_calculate_parallelism_max_parallelism_zero() {
670        let mut config = default_test_config_params();
671        config.max_parallelism = 0; // max_parallelism is 0
672        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
673        let stats = IcebergCompactionTaskStatistics {
674            total_data_file_size: 1024 * 1024 * 50, // 50MB
675            total_data_file_count: 5,
676            total_pos_del_file_size: 0,
677            total_pos_del_file_count: 0,
678            total_eq_del_file_size: 0,
679            total_eq_del_file_count: 0,
680        };
681        // max_parallelism = 0, should return Err.
682        let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
683            1,
684            &dummy_table_ident,
685            &stats,
686            config.max_parallelism,
687            config.min_size_per_partition,
688            config.max_file_count_per_partition,
689            config.target_file_size_bytes,
690        );
691        assert!(result.is_err());
692        if let Err(e) = result {
693            assert!(e.to_string().contains("Max parallelism cannot be 0"));
694        }
695    }
696
697    #[test]
698    fn test_calculate_parallelism_only_delete_files() {
699        let config = default_test_config_params();
700        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
701        let stats = IcebergCompactionTaskStatistics {
702            total_data_file_size: 0,
703            total_data_file_count: 0,
704            total_pos_del_file_size: 1024 * 1024 * 20, // 20MB of pos-delete files
705            total_pos_del_file_count: 2,
706            total_eq_del_file_size: 0,
707            total_eq_del_file_count: 0,
708        };
709        // total_files_count_for_partitioning = 2
710        // total_file_size_for_partitioning = 20MB
711        // partition_by_size = 20MB / 100MB = 1
712        // partition_by_count = 2 / 10 = 1
713        // initial_input = max(1,1) = 1
714        // input = min(1, 4_max_p) = 1
715        // initial_output = min(partition_by_size=1, input=1, max_p=4) = 1
716        // Heuristic: total_data_file_size (0) is not > 0.
717        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
718            1,
719            &dummy_table_ident,
720            &stats,
721            config.max_parallelism,
722            config.min_size_per_partition,
723            config.max_file_count_per_partition,
724            config.target_file_size_bytes,
725        )
726        .unwrap();
727        assert_eq!(input_p, 1);
728        assert_eq!(output_p, 1);
729    }
730
731    #[test]
732    fn test_calculate_parallelism_zero_file_count_non_zero_size() {
733        let config = default_test_config_params();
734        let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
735        let stats = IcebergCompactionTaskStatistics {
736            total_data_file_size: 1024 * 1024 * 200, // 200MB
737            total_data_file_count: 0,                // No data files by count
738            total_pos_del_file_size: 0,
739            total_pos_del_file_count: 0,
740            total_eq_del_file_size: 0,
741            total_eq_del_file_count: 0,
742        };
743        // total_file_size_for_partitioning = 200MB
744        // partition_by_size = (200MB / 100MB).ceil().max(1) = 2
745        // total_files_count_for_partitioning = 0
746        // partition_by_count = (0 / 10).ceil().max(1) = 1
747        // input_parallelism = max(2, 1).min(4) = 2
748        // output_parallelism = partition_by_size.min(input_parallelism).min(max_parallelism) = 2.min(2).min(4) = 2
749        // Heuristic: total_data_file_size (200MB) not < target_file_size_bytes (128MB)
750        let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
751            1,
752            &dummy_table_ident,
753            &stats,
754            config.max_parallelism,
755            config.min_size_per_partition,
756            config.max_file_count_per_partition,
757            config.target_file_size_bytes,
758        )
759        .unwrap();
760        assert_eq!(input_p, 2);
761        assert_eq!(output_p, 2);
762    }
763}