risingwave_connector/sink/iceberg/
compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::time::{Duration, Instant, SystemTime};
18
19use anyhow::{Context, anyhow};
20use aws_credential_types::provider::SharedCredentialsProvider;
21use aws_sdk_emrserverless::Client;
22use aws_sdk_emrserverless::types::builders::SparkSubmitBuilder;
23use aws_sdk_emrserverless::types::{JobDriver, JobRunState};
24use aws_types::region::Region;
25use futures::future::select;
26use itertools::Itertools;
27use thiserror_ext::AsReport;
28use tokio::sync::{mpsc, oneshot};
29use tokio::time::sleep;
30use tokio_retry::strategy::ExponentialBackoff;
31use tracing::{error, info, warn};
32
33use crate::sink::iceberg::IcebergConfig;
34
35pub struct IcebergCompactionConfig {
36    region: Option<String>,
37    access_key: Option<String>,
38    secret_key: Option<String>,
39    execution_role_arn: String,
40    application_id: String,
41    entrypoint: String,
42    compact_frequency: usize,
43    min_compact_gap_duration_sec: Duration,
44}
45
46impl IcebergCompactionConfig {
47    pub fn from_env() -> anyhow::Result<Self> {
48        use std::env::var;
49        Ok(IcebergCompactionConfig {
50            region: var("ICEBERG_COMPACTION_REGION").ok(),
51            access_key: var("ICEBERG_COMPACTION_ACCESS_KEY").ok(),
52            secret_key: var("ICEBERG_COMPACTION_SECRET_KEY").ok(),
53            execution_role_arn: var("ICEBERG_COMPACTION_EXECUTION_ROLE_ARN")
54                .map_err(|_| anyhow!("ICEBERG_COMPACTION_EXECUTION_ROLE_ARN not set in env var"))?,
55            application_id: var("ICEBERG_COMPACTION_APPLICATION_ID")
56                .map_err(|_| anyhow!("ICEBERG_COMPACTION_APPLICATION_ID not set in env var"))?,
57            entrypoint: var("ICEBERG_COMPACTION_ENTRYPOINT")
58                .map_err(|_| anyhow!("ICEBERG_COMPACTION_ENTRYPOINT not set in env var"))?,
59            compact_frequency: var("ICEBERG_COMPACTION_FREQUENCY")
60                .map_err(|_| anyhow!("ICEBERG_COMPACTION_FREQUENCY not set in env var"))?
61                .parse::<usize>()
62                .context("invalid ICEBERG_COMPACTION_FREQUENCY")?,
63            min_compact_gap_duration_sec: Duration::from_secs(
64                var("ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC")
65                    .map_err(|_| {
66                        anyhow!("ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC not set in env var")
67                    })?
68                    .parse::<u64>()
69                    .context("invalid ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC")?,
70            ),
71        })
72    }
73}
74
75async fn run_compact(
76    catalog: String,
77    database: String,
78    table: String,
79    catalog_config: HashMap<String, String>,
80    mut commit_rx: mpsc::UnboundedReceiver<()>,
81) {
82    let config = match IcebergCompactionConfig::from_env() {
83        Ok(config) => config,
84        Err(e) => {
85            error!(catalog, database, table, e = ?e.as_report(), "failed to start compact worker");
86            return;
87        }
88    };
89    let compact_frequency = config.compact_frequency;
90    let min_compact_gap_duration_sec = config.min_compact_gap_duration_sec;
91    let mut pending_commit_num = 0;
92    let client = IcebergCompactionClient::new(config).await;
93    let new_backoff = || {
94        ExponentialBackoff::from_millis(1000)
95            .factor(2)
96            .max_delay(Duration::from_secs(60))
97    };
98
99    let mut prev_compact_success_time = Instant::now();
100    let mut error_backoff = new_backoff();
101    let mut error_count = 0;
102
103    loop {
104        if pending_commit_num >= compact_frequency
105            && Instant::now().duration_since(prev_compact_success_time)
106                > min_compact_gap_duration_sec
107        {
108            let compact_start_time = Instant::now();
109            match client
110                .compact(
111                    catalog.clone(),
112                    database.clone(),
113                    table.clone(),
114                    &catalog_config,
115                )
116                .await
117            {
118                Err(e) => {
119                    let backoff_duration = error_backoff.next().expect("should exist");
120                    error_count += 1;
121                    error!(
122                        err = ?e.as_report(),
123                        ?backoff_duration,
124                        error_count,
125                        catalog, database, table, "failed to compact"
126                    );
127                    sleep(backoff_duration).await;
128                }
129                _ => {
130                    info!(catalog, database, table, elapsed = ?compact_start_time.elapsed(),  "compact success");
131                    pending_commit_num = 0;
132                    error_backoff = new_backoff();
133                    error_count = 0;
134                    prev_compact_success_time = Instant::now();
135                }
136            }
137        }
138        if commit_rx.recv().await.is_some() {
139            pending_commit_num += 1;
140        } else {
141            break;
142        }
143    }
144}
145
146fn get_catalog_config(config: &IcebergConfig) -> anyhow::Result<HashMap<String, String>> {
147    match config.common.catalog_type.as_deref() {
148        Some("storage") | None => Ok(HashMap::from_iter([
149            ("type".to_owned(), "hadoop".to_owned()),
150            (
151                "warehouse".to_owned(),
152                config
153                    .common
154                    .warehouse_path
155                    .clone()
156                    .ok_or_else(|| anyhow!("warehouse unspecified for jdbc catalog"))?,
157            ),
158        ])),
159        Some("jdbc") => Ok(HashMap::from_iter(
160            [
161                ("type".to_owned(), "jdbc".to_owned()),
162                (
163                    "warehouse".to_owned(),
164                    config
165                        .common
166                        .warehouse_path
167                        .clone()
168                        .ok_or_else(|| anyhow!("warehouse unspecified for jdbc catalog"))?,
169                ),
170                (
171                    "uri".to_owned(),
172                    config
173                        .common
174                        .catalog_uri
175                        .clone()
176                        .ok_or_else(|| anyhow!("uri unspecified for jdbc catalog"))?,
177                ),
178            ]
179            .into_iter()
180            .chain(
181                config
182                    .java_catalog_props
183                    .iter()
184                    .filter(|(key, _)| key.starts_with("jdbc."))
185                    .map(|(k, v)| (k.clone(), v.clone())),
186            ),
187        )),
188        Some(other) => Err(anyhow!("unsupported catalog type {} in compaction", other)),
189    }
190}
191
192#[expect(dead_code)]
193pub fn spawn_compaction_client(
194    config: &IcebergConfig,
195) -> anyhow::Result<(mpsc::UnboundedSender<()>, oneshot::Sender<()>)> {
196    let catalog = config
197        .common
198        .catalog_name
199        .clone()
200        .ok_or_else(|| anyhow!("should specify catalog name"))?;
201    let database = config
202        .common
203        .database_name
204        .clone()
205        .ok_or_else(|| anyhow!("should specify database"))?;
206    let table = config.common.table_name.clone();
207    let (commit_tx, commit_rx) = mpsc::unbounded_channel();
208    let (finish_tx, finish_rx) = oneshot::channel();
209
210    let catalog_config = get_catalog_config(config)?;
211
212    let _join_handle = tokio::spawn(async move {
213        select(
214            finish_rx,
215            pin!(run_compact(
216                catalog.clone(),
217                database.clone(),
218                table.clone(),
219                catalog_config,
220                commit_rx,
221            )),
222        )
223        .await;
224        warn!(catalog, database, table, "compact worker exits");
225    });
226    Ok((commit_tx, finish_tx))
227}
228
229pub struct IcebergCompactionClient {
230    client: Client,
231    config: IcebergCompactionConfig,
232}
233
234impl IcebergCompactionClient {
235    pub async fn new(config: IcebergCompactionConfig) -> Self {
236        let config_loader = aws_config::from_env();
237        let config_loader = if let Some(region) = &config.region {
238            config_loader.region(Region::new(region.clone()))
239        } else {
240            config_loader
241        };
242        let config_loader = if let (Some(access_key), Some(secret_key)) =
243            (&config.access_key, &config.secret_key)
244        {
245            config_loader.credentials_provider(SharedCredentialsProvider::new(
246                aws_credential_types::Credentials::from_keys(
247                    access_key.clone(),
248                    secret_key.clone(),
249                    None,
250                ),
251            ))
252        } else {
253            config_loader
254        };
255        let sdk_config = config_loader.load().await;
256        Self {
257            client: Client::new(&sdk_config),
258            config,
259        }
260    }
261
262    async fn wait_job_finish(&self, job_run_id: String) -> anyhow::Result<()> {
263        let start_time = Instant::now();
264        let success_job_run = loop {
265            let output = self
266                .client
267                .get_job_run()
268                .job_run_id(&job_run_id)
269                .application_id(self.config.application_id.clone())
270                .send()
271                .await?;
272            let job_run = output.job_run.ok_or_else(|| anyhow!("empty job run"))?;
273            match &job_run.state {
274                JobRunState::Cancelled | JobRunState::Cancelling | JobRunState::Failed => {
275                    return Err(anyhow!(
276                        "fail state: {}. Detailed: {}",
277                        job_run.state,
278                        job_run.state_details
279                    ));
280                }
281                JobRunState::Pending
282                | JobRunState::Queued
283                | JobRunState::Running
284                | JobRunState::Scheduled
285                | JobRunState::Submitted => {
286                    info!(
287                        elapsed = ?start_time.elapsed(),
288                        job_status = ?job_run.state,
289                        "waiting job."
290                    );
291                    sleep(Duration::from_secs(5)).await;
292                }
293                JobRunState::Success => {
294                    break job_run;
295                }
296                state => {
297                    return Err(anyhow!("unhandled state: {:?}", state));
298                }
299            };
300        };
301        info!(
302            job_run_id,
303            details = success_job_run.state_details,
304            elapsed = ?start_time.elapsed(),
305            "job run finish"
306        );
307        Ok(())
308    }
309
310    pub async fn compact(
311        &self,
312        catalog: String,
313        db: String,
314        table: String,
315        catalog_config: &HashMap<String, String>,
316    ) -> anyhow::Result<()> {
317        let start_result = self
318            .client
319            .start_job_run()
320            .name(format!(
321                "job-run-{}",
322                SystemTime::now()
323                    .duration_since(SystemTime::UNIX_EPOCH)
324                    .unwrap()
325                    .as_millis()
326            ))
327            .application_id(self.config.application_id.clone())
328            .execution_role_arn(self.config.execution_role_arn.clone())
329            .job_driver(JobDriver::SparkSubmit(
330                SparkSubmitBuilder::default()
331                    .entry_point(self.config.entrypoint.clone())
332                    .entry_point_arguments(catalog.clone())
333                    .entry_point_arguments(db)
334                    .entry_point_arguments(table)
335                    .spark_submit_parameters(
336                        catalog_config
337                            .iter()
338                            .map(|(key, value)| {
339                                format!("--conf spark.sql.catalog.{}.{}={}", catalog, key, value)
340                            })
341                            .join(" "),
342                    )
343                    .build()
344                    .unwrap(),
345            ))
346            .send()
347            .await
348            .context("start job")?;
349        info!(job_run_id = start_result.job_run_id, "job started");
350        self.wait_job_finish(start_result.job_run_id).await
351    }
352}
353
354#[tokio::test]
355#[ignore]
356async fn trigger_compaction() {
357    tracing_subscriber::fmt().init();
358    let warehouse = "s3://iceberg-spark/iceberg/";
359    let catalog = "catalog";
360    let db = "db";
361    let table = "table";
362
363    let config = IcebergCompactionConfig::from_env().unwrap();
364    let client = IcebergCompactionClient::new(config).await;
365    let result = client
366        .compact(
367            catalog.to_owned(),
368            db.to_owned(),
369            table.to_owned(),
370            &HashMap::from_iter([
371                ("type".to_owned(), "hadoop".to_owned()),
372                ("warehouse".to_owned(), warehouse.to_owned()),
373            ]),
374        )
375        .await;
376
377    info!(?result, "job result");
378}