risingwave_storage/hummock/
iceberg_compactor_runner.rs1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use derive_builder::Builder;
21use iceberg::spec::MAIN_BRANCH;
22use iceberg::{Catalog, TableIdent};
23use iceberg_compaction_core::compaction::{
24 CommitConsistencyParams, CommitManagerRetryConfig, CompactionBuilder, CompactionPlan,
25 CompactionPlanner, CompactionResult, CompactionType,
26};
27use iceberg_compaction_core::config::{
28 CompactionBaseConfig, CompactionExecutionConfigBuilder, CompactionPlanningConfigBuilder,
29};
30use iceberg_compaction_core::executor::RewriteFilesStat;
31use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
32use parquet::basic::Compression;
33use parquet::file::properties::WriterProperties;
34use risingwave_common::config::storage::default::storage::{
35 iceberg_compaction_enable_dynamic_size_estimation,
36 iceberg_compaction_enable_heuristic_output_parallelism,
37 iceberg_compaction_max_concurrent_closes, iceberg_compaction_size_estimation_smoothing_factor,
38};
39use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
40use risingwave_connector::sink::iceberg::{
41 IcebergConfig, commit_branch, should_enable_iceberg_cow,
42};
43use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
44use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
45use thiserror_ext::AsReport;
46use tokio::sync::oneshot::Receiver;
47
48use super::HummockResult;
49use crate::hummock::HummockError;
50use crate::monitor::CompactorMetrics;
51
52static ICEBERG_COMPACTION_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> =
53 LazyLock::new(|| {
54 Box::new(PrometheusMetricsRegistry::new(
55 GLOBAL_METRICS_REGISTRY.clone(),
56 ))
57 });
58
59pub struct IcebergCompactorRunner {
60 pub task_id: u64,
61 pub catalog: Arc<dyn Catalog>,
62 pub table_ident: TableIdent,
63 pub iceberg_config: IcebergConfig,
64
65 config: IcebergCompactorRunnerConfig,
66 metrics: Arc<CompactorMetrics>,
67 pub task_type: TaskType,
68}
69
70pub fn default_writer_properties() -> WriterProperties {
71 WriterProperties::builder()
72 .set_compression(Compression::SNAPPY)
73 .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
74 .build()
75}
76
77#[derive(Builder, Debug, Clone)]
78pub struct IcebergCompactorRunnerConfig {
79 #[builder(default = "4")]
80 pub max_parallelism: u32,
81 #[builder(default = "1024 * 1024 * 1024")] pub min_size_per_partition: u64,
83 #[builder(default = "32")]
84 pub max_file_count_per_partition: u32,
85 #[builder(default = "1024 * 1024 * 1024")] pub target_file_size_bytes: u64,
87 #[builder(default = "false")]
88 pub enable_validate_compaction: bool,
89 #[builder(default = "1024")]
90 pub max_record_batch_rows: usize,
91 #[builder(default = "default_writer_properties()")]
92 pub write_parquet_properties: WriterProperties,
93 #[builder(default = "32 * 1024 * 1024")] pub small_file_threshold: u64,
95 #[builder(default = "50 * 1024 * 1024 * 1024")] pub max_task_total_size: u64,
97 #[builder(default = "iceberg_compaction_enable_heuristic_output_parallelism()")]
98 pub enable_heuristic_output_parallelism: bool,
99 #[builder(default = "iceberg_compaction_max_concurrent_closes()")]
100 pub max_concurrent_closes: usize,
101 #[builder(default = "iceberg_compaction_enable_dynamic_size_estimation()")]
102 pub enable_dynamic_size_estimation: bool,
103 #[builder(default = "iceberg_compaction_size_estimation_smoothing_factor()")]
104 pub size_estimation_smoothing_factor: f64,
105}
106
107#[derive(Debug, Clone)]
108pub(crate) struct RunnerContext {
109 pub max_available_parallelism: u32,
110 pub running_task_parallelism: Arc<AtomicU32>,
111}
112
113impl RunnerContext {
114 pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
115 Self {
116 max_available_parallelism,
117 running_task_parallelism,
118 }
119 }
120
121 pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
122 (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
123 >= input_parallelism
124 }
125
126 pub fn incr_running_task_parallelism(&self, increment: u32) {
127 self.running_task_parallelism
128 .fetch_add(increment, Ordering::SeqCst);
129 }
130
131 pub fn decr_running_task_parallelism(&self, decrement: u32) {
132 self.running_task_parallelism
133 .fetch_sub(decrement, Ordering::SeqCst);
134 }
135}
136
137impl IcebergCompactorRunner {
138 pub async fn new(
139 iceberg_compaction_task: IcebergCompactionTask,
140 config: IcebergCompactorRunnerConfig,
141 metrics: Arc<CompactorMetrics>,
142 ) -> HummockResult<Self> {
143 let IcebergCompactionTask {
144 task_id,
145 props,
146 task_type,
147 } = iceberg_compaction_task;
148 let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
149 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
150 let catalog = iceberg_config
151 .create_catalog()
152 .await
153 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
154 let table_ident = iceberg_config
155 .full_table_name()
156 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
157
158 Ok(Self {
159 task_id,
160 catalog,
161 table_ident,
162 iceberg_config,
163 config,
164 metrics,
165 task_type: TaskType::try_from(task_type).map_err(|e| {
166 HummockError::compaction_executor(format!("Invalid task type: {}", e.as_report()))
167 })?,
168 })
169 }
170
171 pub async fn compact(
172 self,
173 context: RunnerContext,
174 shutdown_rx: Receiver<()>,
175 ) -> HummockResult<()> {
176 let task_id = self.task_id;
177 let now = std::time::Instant::now();
178
179 let compact = async move {
180 let compaction_type = Self::get_compaction_type(self.task_type);
181 let planning_config = CompactionPlanningConfigBuilder::default()
182 .max_parallelism(self.config.max_parallelism as usize)
183 .min_size_per_partition(self.config.min_size_per_partition)
184 .max_file_count_per_partition(self.config.max_file_count_per_partition as _)
185 .base(CompactionBaseConfig {
186 target_file_size: self.config.target_file_size_bytes,
187 })
188 .enable_heuristic_output_parallelism(
189 self.config.enable_heuristic_output_parallelism,
190 )
191 .small_file_threshold(self.config.small_file_threshold)
192 .max_task_total_size(self.config.max_task_total_size)
193 .build()
194 .unwrap_or_else(|e| {
195 panic!(
196 "Failed to build iceberg compaction planning config: {:?}",
197 e.as_report()
198 );
199 });
200
201 let branch = commit_branch(
202 self.iceberg_config.r#type.as_str(),
203 self.iceberg_config.write_mode.as_str(),
204 );
205
206 let planner = CompactionPlanner::new(planning_config.clone());
207
208 let table = self
209 .catalog
210 .load_table(&self.table_ident)
211 .await
212 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
213
214 let compaction_plan = planner
215 .plan_compaction_with_branch(&table, compaction_type, &branch)
216 .await
217 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
218
219 let statistics = self.analyze_task_statistics(&compaction_plan);
220
221 let input_parallelism = compaction_plan.recommended_executor_parallelism() as u32;
222 let output_parallelism = compaction_plan.recommended_output_parallelism() as u32;
223
224 if !context.is_available_parallelism_sufficient(
225 compaction_plan.recommended_executor_parallelism() as _,
226 ) {
227 tracing::warn!(
228 task_id = task_id,
229 table = ?self.table_ident,
230 input_parallelism = input_parallelism,
231 "Available parallelism is less than input parallelism task will not run",
232 );
233 return Err(HummockError::compaction_executor(
234 "Available parallelism is less than input parallelism",
235 ));
236 }
237
238 let compaction_execution_config = CompactionExecutionConfigBuilder::default()
239 .enable_validate_compaction(self.config.enable_validate_compaction)
240 .max_record_batch_rows(self.config.max_record_batch_rows)
241 .write_parquet_properties(self.config.write_parquet_properties.clone())
242 .base(CompactionBaseConfig {
243 target_file_size: self.config.target_file_size_bytes,
244 })
245 .max_concurrent_closes(self.config.max_concurrent_closes)
246 .enable_dynamic_size_estimation(self.config.enable_dynamic_size_estimation)
247 .size_estimation_smoothing_factor(self.config.size_estimation_smoothing_factor)
248 .build()
249 .unwrap_or_else(|e| {
250 panic!(
251 "Failed to build iceberg compaction write props: {:?}",
252 e.as_report()
253 );
254 });
255
256 tracing::info!(
257 task_id = task_id,
258 task_type = ?self.task_type,
259 table = ?self.table_ident,
260 input_parallelism = input_parallelism,
261 output_parallelism = output_parallelism,
262 statistics = ?statistics,
263 planning_config = ?planning_config,
264 "Iceberg compaction task started",
265 );
266
267 let retry_config = CommitManagerRetryConfig::default();
268 let compaction = CompactionBuilder::new(
269 self.catalog.clone(),
270 self.table_ident.clone(),
271 CompactionType::Full,
272 )
273 .with_catalog_name(self.iceberg_config.catalog_name())
274 .with_executor_type(iceberg_compaction_core::executor::ExecutorType::DataFusion)
275 .with_registry(ICEBERG_COMPACTION_METRICS_REGISTRY.clone())
276 .with_retry_config(retry_config)
277 .with_to_branch(branch)
278 .build();
279
280 context.incr_running_task_parallelism(input_parallelism);
281 self.metrics.compact_task_pending_num.inc();
282 self.metrics
283 .compact_task_pending_parallelism
284 .add(input_parallelism as _);
285
286 let _release_guard = scopeguard::guard(
287 (input_parallelism, context.clone(), self.metrics.clone()), |(val, ctx, metrics_guard)| {
289 ctx.decr_running_task_parallelism(val);
290 metrics_guard.compact_task_pending_num.dec();
291 metrics_guard.compact_task_pending_parallelism.sub(val as _);
292 },
293 );
294
295 let CompactionResult {
296 data_files,
297 stats,
298 table,
299 } = compaction
300 .compact_with_plan(compaction_plan, &compaction_execution_config)
301 .await
302 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
303
304 if let Some(committed_table) = table
305 && should_enable_iceberg_cow(
306 self.iceberg_config.r#type.as_str(),
307 self.iceberg_config.write_mode.as_str(),
308 )
309 {
310 let ingestion_branch = commit_branch(
311 self.iceberg_config.r#type.as_str(),
312 self.iceberg_config.write_mode.as_str(),
313 );
314
315 let consistency_params = CommitConsistencyParams {
317 starting_snapshot_id: committed_table
318 .metadata()
319 .snapshot_for_ref(ingestion_branch.as_str())
320 .ok_or(HummockError::compaction_executor(anyhow::anyhow!(
321 "Don't find current_snapshot for ingestion_branch {}",
322 ingestion_branch
323 )))?
324 .snapshot_id(),
325 use_starting_sequence_number: true,
326 basic_schema_id: committed_table.metadata().current_schema().schema_id(),
327 };
328
329 let commit_manager = compaction.build_commit_manager(consistency_params);
330
331 let input_files = {
332 let mut input_files = vec![];
333 if let Some(snapshot) = committed_table.metadata().snapshot_for_ref(MAIN_BRANCH)
334 {
335 let manifest_list = snapshot
336 .load_manifest_list(
337 committed_table.file_io(),
338 committed_table.metadata(),
339 )
340 .await
341 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
342
343 for manifest_file in manifest_list
344 .entries()
345 .iter()
346 .filter(|entry| entry.has_added_files() || entry.has_existing_files())
347 {
348 let manifest = manifest_file
349 .load_manifest(committed_table.file_io())
350 .await
351 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
352 let (entry, _) = manifest.into_parts();
353 for i in entry {
354 match i.content_type() {
355 iceberg::spec::DataContentType::Data => {
356 input_files.push(i.data_file().clone());
357 }
358 iceberg::spec::DataContentType::EqualityDeletes => {
359 unreachable!(
360 "Equality deletes are not supported in main branch"
361 );
362 }
363 iceberg::spec::DataContentType::PositionDeletes => {
364 unreachable!(
365 "Position deletes are not supported in main branch"
366 );
367 }
368 }
369 }
370 }
371
372 input_files
373 } else {
374 vec![]
375 }
376 };
377
378 let _new_table = commit_manager
379 .overwrite_files(data_files, input_files, MAIN_BRANCH)
380 .await
381 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
382 }
383
384 Ok::<RewriteFilesStat, HummockError>(stats)
385 };
386
387 tokio::select! {
388 _ = shutdown_rx => {
389 tracing::info!(task_id = task_id, "Iceberg compaction task cancelled");
390 }
391 stat = compact => {
392 match stat {
393 Ok(stat) => {
394 tracing::info!(
395 task_id = task_id,
396 elapsed_millis = now.elapsed().as_millis(),
397 stat = ?stat,
398 "Iceberg compaction task finished",
399 );
400 }
401
402 Err(e) => {
403 tracing::warn!(
404 error = %e.as_report(),
405 task_id = task_id,
406 "Iceberg compaction task failed with error",
407 );
408 }
409 }
410 }
411 }
412
413 Ok(())
414 }
415
416 fn analyze_task_statistics(&self, plan: &CompactionPlan) -> IcebergCompactionTaskStatistics {
417 let mut total_data_file_size: u64 = 0;
418 let mut total_data_file_count = 0;
419 let mut total_pos_del_file_size: u64 = 0;
420 let mut total_pos_del_file_count = 0;
421 let mut total_eq_del_file_size: u64 = 0;
422 let mut total_eq_del_file_count = 0;
423
424 for data_file in &plan.files_to_compact.data_files {
425 total_data_file_size += data_file.file_size_in_bytes;
426 total_data_file_count += 1;
427 }
428
429 for pos_del_file in &plan.files_to_compact.position_delete_files {
430 total_pos_del_file_size += pos_del_file.file_size_in_bytes;
431 total_pos_del_file_count += 1;
432 }
433
434 for eq_del_file in &plan.files_to_compact.equality_delete_files {
435 total_eq_del_file_size += eq_del_file.file_size_in_bytes;
436 total_eq_del_file_count += 1;
437 }
438
439 IcebergCompactionTaskStatistics {
440 total_data_file_size,
441 total_data_file_count,
442 total_pos_del_file_size,
443 total_pos_del_file_count,
444 total_eq_del_file_size,
445 total_eq_del_file_count,
446 }
447 }
448
449 fn get_compaction_type(task_type: TaskType) -> CompactionType {
450 match task_type {
451 TaskType::SmallDataFileCompaction => CompactionType::MergeSmallDataFiles,
452 TaskType::FullCompaction => CompactionType::Full,
453 _ => {
454 unreachable!(
455 "Unexpected task type for Iceberg compaction: {:?}",
456 task_type
457 )
458 }
459 }
460 }
461}
462
463pub struct IcebergCompactionTaskStatistics {
464 pub total_data_file_size: u64,
465 pub total_data_file_count: u32,
466 pub total_pos_del_file_size: u64,
467 pub total_pos_del_file_count: u32,
468 pub total_eq_del_file_size: u64,
469 pub total_eq_del_file_count: u32,
470}
471
472impl Debug for IcebergCompactionTaskStatistics {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 f.debug_struct("IcebergCompactionTaskStatistics")
475 .field("total_data_file_size", &self.total_data_file_size)
476 .field("total_data_file_count", &self.total_data_file_count)
477 .field("total_pos_del_file_size", &self.total_pos_del_file_size)
478 .field("total_pos_del_file_count", &self.total_pos_del_file_count)
479 .field("total_eq_del_file_size", &self.total_eq_del_file_size)
480 .field("total_eq_del_file_count", &self.total_eq_del_file_count)
481 .finish()
482 }
483}