risingwave_storage/hummock/
iceberg_compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use derive_builder::Builder;
21use iceberg::spec::MAIN_BRANCH;
22use iceberg::{Catalog, TableIdent};
23use iceberg_compaction_core::compaction::{
24    CommitConsistencyParams, CommitManagerRetryConfig, CompactionBuilder, CompactionPlan,
25    CompactionPlanner, CompactionResult, CompactionType,
26};
27use iceberg_compaction_core::config::{
28    CompactionBaseConfig, CompactionExecutionConfigBuilder, CompactionPlanningConfigBuilder,
29};
30use iceberg_compaction_core::executor::RewriteFilesStat;
31use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
32use parquet::basic::Compression;
33use parquet::file::properties::WriterProperties;
34use risingwave_common::config::storage::default::storage::{
35    iceberg_compaction_enable_dynamic_size_estimation,
36    iceberg_compaction_enable_heuristic_output_parallelism,
37    iceberg_compaction_max_concurrent_closes, iceberg_compaction_size_estimation_smoothing_factor,
38};
39use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
40use risingwave_connector::sink::iceberg::{
41    IcebergConfig, commit_branch, should_enable_iceberg_cow,
42};
43use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
44use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
45use thiserror_ext::AsReport;
46use tokio::sync::oneshot::Receiver;
47
48use super::HummockResult;
49use crate::hummock::HummockError;
50use crate::monitor::CompactorMetrics;
51
52static ICEBERG_COMPACTION_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> =
53    LazyLock::new(|| {
54        Box::new(PrometheusMetricsRegistry::new(
55            GLOBAL_METRICS_REGISTRY.clone(),
56        ))
57    });
58
59pub struct IcebergCompactorRunner {
60    pub task_id: u64,
61    pub catalog: Arc<dyn Catalog>,
62    pub table_ident: TableIdent,
63    pub iceberg_config: IcebergConfig,
64
65    config: IcebergCompactorRunnerConfig,
66    metrics: Arc<CompactorMetrics>,
67    pub task_type: TaskType,
68}
69
70pub fn default_writer_properties() -> WriterProperties {
71    WriterProperties::builder()
72        .set_compression(Compression::SNAPPY)
73        .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
74        .build()
75}
76
77#[derive(Builder, Debug, Clone)]
78pub struct IcebergCompactorRunnerConfig {
79    #[builder(default = "4")]
80    pub max_parallelism: u32,
81    #[builder(default = "1024 * 1024 * 1024")] // 1GB"
82    pub min_size_per_partition: u64,
83    #[builder(default = "32")]
84    pub max_file_count_per_partition: u32,
85    #[builder(default = "1024 * 1024 * 1024")] // 1GB
86    pub target_file_size_bytes: u64,
87    #[builder(default = "false")]
88    pub enable_validate_compaction: bool,
89    #[builder(default = "1024")]
90    pub max_record_batch_rows: usize,
91    #[builder(default = "default_writer_properties()")]
92    pub write_parquet_properties: WriterProperties,
93    #[builder(default = "32 * 1024 * 1024")] // 32MB
94    pub small_file_threshold: u64,
95    #[builder(default = "50 * 1024 * 1024 * 1024")] // 50GB
96    pub max_task_total_size: u64,
97    #[builder(default = "iceberg_compaction_enable_heuristic_output_parallelism()")]
98    pub enable_heuristic_output_parallelism: bool,
99    #[builder(default = "iceberg_compaction_max_concurrent_closes()")]
100    pub max_concurrent_closes: usize,
101    #[builder(default = "iceberg_compaction_enable_dynamic_size_estimation()")]
102    pub enable_dynamic_size_estimation: bool,
103    #[builder(default = "iceberg_compaction_size_estimation_smoothing_factor()")]
104    pub size_estimation_smoothing_factor: f64,
105}
106
107#[derive(Debug, Clone)]
108pub(crate) struct RunnerContext {
109    pub max_available_parallelism: u32,
110    pub running_task_parallelism: Arc<AtomicU32>,
111}
112
113impl RunnerContext {
114    pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
115        Self {
116            max_available_parallelism,
117            running_task_parallelism,
118        }
119    }
120
121    pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
122        (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
123            >= input_parallelism
124    }
125
126    pub fn incr_running_task_parallelism(&self, increment: u32) {
127        self.running_task_parallelism
128            .fetch_add(increment, Ordering::SeqCst);
129    }
130
131    pub fn decr_running_task_parallelism(&self, decrement: u32) {
132        self.running_task_parallelism
133            .fetch_sub(decrement, Ordering::SeqCst);
134    }
135}
136
137impl IcebergCompactorRunner {
138    pub async fn new(
139        iceberg_compaction_task: IcebergCompactionTask,
140        config: IcebergCompactorRunnerConfig,
141        metrics: Arc<CompactorMetrics>,
142    ) -> HummockResult<Self> {
143        let IcebergCompactionTask {
144            task_id,
145            props,
146            task_type,
147        } = iceberg_compaction_task;
148        let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
149            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
150        let catalog = iceberg_config
151            .create_catalog()
152            .await
153            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
154        let table_ident = iceberg_config
155            .full_table_name()
156            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
157
158        Ok(Self {
159            task_id,
160            catalog,
161            table_ident,
162            iceberg_config,
163            config,
164            metrics,
165            task_type: TaskType::try_from(task_type).map_err(|e| {
166                HummockError::compaction_executor(format!("Invalid task type: {}", e.as_report()))
167            })?,
168        })
169    }
170
171    pub async fn compact(
172        self,
173        context: RunnerContext,
174        shutdown_rx: Receiver<()>,
175    ) -> HummockResult<()> {
176        let task_id = self.task_id;
177        let now = std::time::Instant::now();
178
179        let compact = async move {
180            let compaction_type = Self::get_compaction_type(self.task_type);
181            let planning_config = CompactionPlanningConfigBuilder::default()
182                .max_parallelism(self.config.max_parallelism as usize)
183                .min_size_per_partition(self.config.min_size_per_partition)
184                .max_file_count_per_partition(self.config.max_file_count_per_partition as _)
185                .base(CompactionBaseConfig {
186                    target_file_size: self.config.target_file_size_bytes,
187                })
188                .enable_heuristic_output_parallelism(
189                    self.config.enable_heuristic_output_parallelism,
190                )
191                .small_file_threshold(self.config.small_file_threshold)
192                .max_task_total_size(self.config.max_task_total_size)
193                .build()
194                .unwrap_or_else(|e| {
195                    panic!(
196                        "Failed to build iceberg compaction planning config: {:?}",
197                        e.as_report()
198                    );
199                });
200
201            let branch = commit_branch(
202                self.iceberg_config.r#type.as_str(),
203                self.iceberg_config.write_mode.as_str(),
204            );
205
206            let planner = CompactionPlanner::new(planning_config.clone());
207
208            let table = self
209                .catalog
210                .load_table(&self.table_ident)
211                .await
212                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
213
214            let compaction_plan = planner
215                .plan_compaction_with_branch(&table, compaction_type, &branch)
216                .await
217                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
218
219            let statistics = self.analyze_task_statistics(&compaction_plan);
220
221            let input_parallelism = compaction_plan.recommended_executor_parallelism() as u32;
222            let output_parallelism = compaction_plan.recommended_output_parallelism() as u32;
223
224            if !context.is_available_parallelism_sufficient(
225                compaction_plan.recommended_executor_parallelism() as _,
226            ) {
227                tracing::warn!(
228                    task_id = task_id,
229                    table = ?self.table_ident,
230                    input_parallelism = input_parallelism,
231                    "Available parallelism is less than input parallelism task will not run",
232                );
233                return Err(HummockError::compaction_executor(
234                    "Available parallelism is less than input parallelism",
235                ));
236            }
237
238            let compaction_execution_config = CompactionExecutionConfigBuilder::default()
239                .enable_validate_compaction(self.config.enable_validate_compaction)
240                .max_record_batch_rows(self.config.max_record_batch_rows)
241                .write_parquet_properties(self.config.write_parquet_properties.clone())
242                .base(CompactionBaseConfig {
243                    target_file_size: self.config.target_file_size_bytes,
244                })
245                .max_concurrent_closes(self.config.max_concurrent_closes)
246                .enable_dynamic_size_estimation(self.config.enable_dynamic_size_estimation)
247                .size_estimation_smoothing_factor(self.config.size_estimation_smoothing_factor)
248                .build()
249                .unwrap_or_else(|e| {
250                    panic!(
251                        "Failed to build iceberg compaction write props: {:?}",
252                        e.as_report()
253                    );
254                });
255
256            tracing::info!(
257                task_id = task_id,
258                task_type = ?self.task_type,
259                table = ?self.table_ident,
260                input_parallelism = input_parallelism,
261                output_parallelism = output_parallelism,
262                statistics = ?statistics,
263                planning_config = ?planning_config,
264                "Iceberg compaction task started",
265            );
266
267            let retry_config = CommitManagerRetryConfig::default();
268            let compaction = CompactionBuilder::new(
269                self.catalog.clone(),
270                self.table_ident.clone(),
271                CompactionType::Full,
272            )
273            .with_catalog_name(self.iceberg_config.catalog_name())
274            .with_executor_type(iceberg_compaction_core::executor::ExecutorType::DataFusion)
275            .with_registry(ICEBERG_COMPACTION_METRICS_REGISTRY.clone())
276            .with_retry_config(retry_config)
277            .with_to_branch(branch)
278            .build();
279
280            context.incr_running_task_parallelism(input_parallelism);
281            self.metrics.compact_task_pending_num.inc();
282            self.metrics
283                .compact_task_pending_parallelism
284                .add(input_parallelism as _);
285
286            let _release_guard = scopeguard::guard(
287                (input_parallelism, context.clone(), self.metrics.clone()), /* metrics.clone() if Arc */
288                |(val, ctx, metrics_guard)| {
289                    ctx.decr_running_task_parallelism(val);
290                    metrics_guard.compact_task_pending_num.dec();
291                    metrics_guard.compact_task_pending_parallelism.sub(val as _);
292                },
293            );
294
295            let CompactionResult {
296                data_files,
297                stats,
298                table,
299            } = compaction
300                .compact_with_plan(compaction_plan, &compaction_execution_config)
301                .await
302                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
303
304            if let Some(committed_table) = table
305                && should_enable_iceberg_cow(
306                    self.iceberg_config.r#type.as_str(),
307                    self.iceberg_config.write_mode.as_str(),
308                )
309            {
310                let ingestion_branch = commit_branch(
311                    self.iceberg_config.r#type.as_str(),
312                    self.iceberg_config.write_mode.as_str(),
313                );
314
315                // Overwrite Main branch
316                let consistency_params = CommitConsistencyParams {
317                    starting_snapshot_id: committed_table
318                        .metadata()
319                        .snapshot_for_ref(ingestion_branch.as_str())
320                        .ok_or(HummockError::compaction_executor(anyhow::anyhow!(
321                            "Don't find current_snapshot for ingestion_branch {}",
322                            ingestion_branch
323                        )))?
324                        .snapshot_id(),
325                    use_starting_sequence_number: true,
326                    basic_schema_id: committed_table.metadata().current_schema().schema_id(),
327                };
328
329                let commit_manager = compaction.build_commit_manager(consistency_params);
330
331                let input_files = {
332                    let mut input_files = vec![];
333                    if let Some(snapshot) = committed_table.metadata().snapshot_for_ref(MAIN_BRANCH)
334                    {
335                        let manifest_list = snapshot
336                            .load_manifest_list(
337                                committed_table.file_io(),
338                                committed_table.metadata(),
339                            )
340                            .await
341                            .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
342
343                        for manifest_file in manifest_list
344                            .entries()
345                            .iter()
346                            .filter(|entry| entry.has_added_files() || entry.has_existing_files())
347                        {
348                            let manifest = manifest_file
349                                .load_manifest(committed_table.file_io())
350                                .await
351                                .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
352                            let (entry, _) = manifest.into_parts();
353                            for i in entry {
354                                match i.content_type() {
355                                    iceberg::spec::DataContentType::Data => {
356                                        input_files.push(i.data_file().clone());
357                                    }
358                                    iceberg::spec::DataContentType::EqualityDeletes => {
359                                        unreachable!(
360                                            "Equality deletes are not supported in main branch"
361                                        );
362                                    }
363                                    iceberg::spec::DataContentType::PositionDeletes => {
364                                        unreachable!(
365                                            "Position deletes are not supported in main branch"
366                                        );
367                                    }
368                                }
369                            }
370                        }
371
372                        input_files
373                    } else {
374                        vec![]
375                    }
376                };
377
378                let _new_table = commit_manager
379                    .overwrite_files(data_files, input_files, MAIN_BRANCH)
380                    .await
381                    .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
382            }
383
384            Ok::<RewriteFilesStat, HummockError>(stats)
385        };
386
387        tokio::select! {
388            _ = shutdown_rx => {
389                tracing::info!(task_id = task_id, "Iceberg compaction task cancelled");
390            }
391            stat = compact => {
392                match stat {
393                    Ok(stat) => {
394                        tracing::info!(
395                            task_id = task_id,
396                            elapsed_millis = now.elapsed().as_millis(),
397                            stat = ?stat,
398                            "Iceberg compaction task finished",
399                        );
400                    }
401
402                    Err(e) => {
403                        tracing::warn!(
404                            error = %e.as_report(),
405                            task_id = task_id,
406                            "Iceberg compaction task failed with error",
407                        );
408                    }
409                }
410            }
411        }
412
413        Ok(())
414    }
415
416    fn analyze_task_statistics(&self, plan: &CompactionPlan) -> IcebergCompactionTaskStatistics {
417        let mut total_data_file_size: u64 = 0;
418        let mut total_data_file_count = 0;
419        let mut total_pos_del_file_size: u64 = 0;
420        let mut total_pos_del_file_count = 0;
421        let mut total_eq_del_file_size: u64 = 0;
422        let mut total_eq_del_file_count = 0;
423
424        for data_file in &plan.files_to_compact.data_files {
425            total_data_file_size += data_file.file_size_in_bytes;
426            total_data_file_count += 1;
427        }
428
429        for pos_del_file in &plan.files_to_compact.position_delete_files {
430            total_pos_del_file_size += pos_del_file.file_size_in_bytes;
431            total_pos_del_file_count += 1;
432        }
433
434        for eq_del_file in &plan.files_to_compact.equality_delete_files {
435            total_eq_del_file_size += eq_del_file.file_size_in_bytes;
436            total_eq_del_file_count += 1;
437        }
438
439        IcebergCompactionTaskStatistics {
440            total_data_file_size,
441            total_data_file_count,
442            total_pos_del_file_size,
443            total_pos_del_file_count,
444            total_eq_del_file_size,
445            total_eq_del_file_count,
446        }
447    }
448
449    fn get_compaction_type(task_type: TaskType) -> CompactionType {
450        match task_type {
451            TaskType::SmallDataFileCompaction => CompactionType::MergeSmallDataFiles,
452            TaskType::FullCompaction => CompactionType::Full,
453            _ => {
454                unreachable!(
455                    "Unexpected task type for Iceberg compaction: {:?}",
456                    task_type
457                )
458            }
459        }
460    }
461}
462
463pub struct IcebergCompactionTaskStatistics {
464    pub total_data_file_size: u64,
465    pub total_data_file_count: u32,
466    pub total_pos_del_file_size: u64,
467    pub total_pos_del_file_count: u32,
468    pub total_eq_del_file_size: u64,
469    pub total_eq_del_file_count: u32,
470}
471
472impl Debug for IcebergCompactionTaskStatistics {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        f.debug_struct("IcebergCompactionTaskStatistics")
475            .field("total_data_file_size", &self.total_data_file_size)
476            .field("total_data_file_count", &self.total_data_file_count)
477            .field("total_pos_del_file_size", &self.total_pos_del_file_size)
478            .field("total_pos_del_file_count", &self.total_pos_del_file_count)
479            .field("total_eq_del_file_size", &self.total_eq_del_file_size)
480            .field("total_eq_del_file_count", &self.total_eq_del_file_count)
481            .finish()
482    }
483}