risingwave_connector/sink/iceberg/
compaction.rs1use std::collections::HashMap;
16use std::pin::pin;
17use std::time::{Duration, Instant, SystemTime};
18
19use anyhow::{Context, anyhow};
20use aws_credential_types::provider::SharedCredentialsProvider;
21use aws_sdk_emrserverless::Client;
22use aws_sdk_emrserverless::types::builders::SparkSubmitBuilder;
23use aws_sdk_emrserverless::types::{JobDriver, JobRunState};
24use aws_types::region::Region;
25use futures::future::select;
26use itertools::Itertools;
27use thiserror_ext::AsReport;
28use tokio::sync::{mpsc, oneshot};
29use tokio::time::sleep;
30use tokio_retry::strategy::ExponentialBackoff;
31use tracing::{error, info, warn};
32
33use crate::sink::iceberg::IcebergConfig;
34
35pub struct IcebergCompactionConfig {
36 region: Option<String>,
37 access_key: Option<String>,
38 secret_key: Option<String>,
39 execution_role_arn: String,
40 application_id: String,
41 entrypoint: String,
42 compact_frequency: usize,
43 min_compact_gap_duration_sec: Duration,
44}
45
46impl IcebergCompactionConfig {
47 pub fn from_env() -> anyhow::Result<Self> {
48 use std::env::var;
49 Ok(IcebergCompactionConfig {
50 region: var("ICEBERG_COMPACTION_REGION").ok(),
51 access_key: var("ICEBERG_COMPACTION_ACCESS_KEY").ok(),
52 secret_key: var("ICEBERG_COMPACTION_SECRET_KEY").ok(),
53 execution_role_arn: var("ICEBERG_COMPACTION_EXECUTION_ROLE_ARN")
54 .map_err(|_| anyhow!("ICEBERG_COMPACTION_EXECUTION_ROLE_ARN not set in env var"))?,
55 application_id: var("ICEBERG_COMPACTION_APPLICATION_ID")
56 .map_err(|_| anyhow!("ICEBERG_COMPACTION_APPLICATION_ID not set in env var"))?,
57 entrypoint: var("ICEBERG_COMPACTION_ENTRYPOINT")
58 .map_err(|_| anyhow!("ICEBERG_COMPACTION_ENTRYPOINT not set in env var"))?,
59 compact_frequency: var("ICEBERG_COMPACTION_FREQUENCY")
60 .map_err(|_| anyhow!("ICEBERG_COMPACTION_FREQUENCY not set in env var"))?
61 .parse::<usize>()
62 .context("invalid ICEBERG_COMPACTION_FREQUENCY")?,
63 min_compact_gap_duration_sec: Duration::from_secs(
64 var("ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC")
65 .map_err(|_| {
66 anyhow!("ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC not set in env var")
67 })?
68 .parse::<u64>()
69 .context("invalid ICEBERG_MIN_COMPACTION_GAP_DURATION_SEC")?,
70 ),
71 })
72 }
73}
74
75async fn run_compact(
76 catalog: String,
77 database: String,
78 table: String,
79 catalog_config: HashMap<String, String>,
80 mut commit_rx: mpsc::UnboundedReceiver<()>,
81) {
82 let config = match IcebergCompactionConfig::from_env() {
83 Ok(config) => config,
84 Err(e) => {
85 error!(catalog, database, table, e = ?e.as_report(), "failed to start compact worker");
86 return;
87 }
88 };
89 let compact_frequency = config.compact_frequency;
90 let min_compact_gap_duration_sec = config.min_compact_gap_duration_sec;
91 let mut pending_commit_num = 0;
92 let client = IcebergCompactionClient::new(config).await;
93 let new_backoff = || {
94 ExponentialBackoff::from_millis(1000)
95 .factor(2)
96 .max_delay(Duration::from_secs(60))
97 };
98
99 let mut prev_compact_success_time = Instant::now();
100 let mut error_backoff = new_backoff();
101 let mut error_count = 0;
102
103 loop {
104 if pending_commit_num >= compact_frequency
105 && Instant::now().duration_since(prev_compact_success_time)
106 > min_compact_gap_duration_sec
107 {
108 let compact_start_time = Instant::now();
109 match client
110 .compact(
111 catalog.clone(),
112 database.clone(),
113 table.clone(),
114 &catalog_config,
115 )
116 .await
117 {
118 Err(e) => {
119 let backoff_duration = error_backoff.next().expect("should exist");
120 error_count += 1;
121 error!(
122 err = ?e.as_report(),
123 ?backoff_duration,
124 error_count,
125 catalog, database, table, "failed to compact"
126 );
127 sleep(backoff_duration).await;
128 }
129 _ => {
130 info!(catalog, database, table, elapsed = ?compact_start_time.elapsed(), "compact success");
131 pending_commit_num = 0;
132 error_backoff = new_backoff();
133 error_count = 0;
134 prev_compact_success_time = Instant::now();
135 }
136 }
137 }
138 if commit_rx.recv().await.is_some() {
139 pending_commit_num += 1;
140 } else {
141 break;
142 }
143 }
144}
145
146fn get_catalog_config(config: &IcebergConfig) -> anyhow::Result<HashMap<String, String>> {
147 match config.common.catalog_type.as_deref() {
148 Some("storage") | None => Ok(HashMap::from_iter([
149 ("type".to_owned(), "hadoop".to_owned()),
150 (
151 "warehouse".to_owned(),
152 config
153 .common
154 .warehouse_path
155 .clone()
156 .ok_or_else(|| anyhow!("warehouse unspecified for jdbc catalog"))?,
157 ),
158 ])),
159 Some("jdbc") => Ok(HashMap::from_iter(
160 [
161 ("type".to_owned(), "jdbc".to_owned()),
162 (
163 "warehouse".to_owned(),
164 config
165 .common
166 .warehouse_path
167 .clone()
168 .ok_or_else(|| anyhow!("warehouse unspecified for jdbc catalog"))?,
169 ),
170 (
171 "uri".to_owned(),
172 config
173 .common
174 .catalog_uri
175 .clone()
176 .ok_or_else(|| anyhow!("uri unspecified for jdbc catalog"))?,
177 ),
178 ]
179 .into_iter()
180 .chain(
181 config
182 .java_catalog_props
183 .iter()
184 .filter(|(key, _)| key.starts_with("jdbc."))
185 .map(|(k, v)| (k.clone(), v.clone())),
186 ),
187 )),
188 Some(other) => Err(anyhow!("unsupported catalog type {} in compaction", other)),
189 }
190}
191
192#[expect(dead_code)]
193pub fn spawn_compaction_client(
194 config: &IcebergConfig,
195) -> anyhow::Result<(mpsc::UnboundedSender<()>, oneshot::Sender<()>)> {
196 let catalog = config
197 .common
198 .catalog_name
199 .clone()
200 .ok_or_else(|| anyhow!("should specify catalog name"))?;
201 let database = config
202 .common
203 .database_name
204 .clone()
205 .ok_or_else(|| anyhow!("should specify database"))?;
206 let table = config.common.table_name.clone();
207 let (commit_tx, commit_rx) = mpsc::unbounded_channel();
208 let (finish_tx, finish_rx) = oneshot::channel();
209
210 let catalog_config = get_catalog_config(config)?;
211
212 let _join_handle = tokio::spawn(async move {
213 select(
214 finish_rx,
215 pin!(run_compact(
216 catalog.clone(),
217 database.clone(),
218 table.clone(),
219 catalog_config,
220 commit_rx,
221 )),
222 )
223 .await;
224 warn!(catalog, database, table, "compact worker exits");
225 });
226 Ok((commit_tx, finish_tx))
227}
228
229pub struct IcebergCompactionClient {
230 client: Client,
231 config: IcebergCompactionConfig,
232}
233
234impl IcebergCompactionClient {
235 pub async fn new(config: IcebergCompactionConfig) -> Self {
236 let config_loader = aws_config::from_env();
237 let config_loader = if let Some(region) = &config.region {
238 config_loader.region(Region::new(region.clone()))
239 } else {
240 config_loader
241 };
242 let config_loader = if let (Some(access_key), Some(secret_key)) =
243 (&config.access_key, &config.secret_key)
244 {
245 config_loader.credentials_provider(SharedCredentialsProvider::new(
246 aws_credential_types::Credentials::from_keys(
247 access_key.clone(),
248 secret_key.clone(),
249 None,
250 ),
251 ))
252 } else {
253 config_loader
254 };
255 let sdk_config = config_loader.load().await;
256 Self {
257 client: Client::new(&sdk_config),
258 config,
259 }
260 }
261
262 async fn wait_job_finish(&self, job_run_id: String) -> anyhow::Result<()> {
263 let start_time = Instant::now();
264 let success_job_run = loop {
265 let output = self
266 .client
267 .get_job_run()
268 .job_run_id(&job_run_id)
269 .application_id(self.config.application_id.clone())
270 .send()
271 .await?;
272 let job_run = output.job_run.ok_or_else(|| anyhow!("empty job run"))?;
273 match &job_run.state {
274 JobRunState::Cancelled | JobRunState::Cancelling | JobRunState::Failed => {
275 return Err(anyhow!(
276 "fail state: {}. Detailed: {}",
277 job_run.state,
278 job_run.state_details
279 ));
280 }
281 JobRunState::Pending
282 | JobRunState::Queued
283 | JobRunState::Running
284 | JobRunState::Scheduled
285 | JobRunState::Submitted => {
286 info!(
287 elapsed = ?start_time.elapsed(),
288 job_status = ?job_run.state,
289 "waiting job."
290 );
291 sleep(Duration::from_secs(5)).await;
292 }
293 JobRunState::Success => {
294 break job_run;
295 }
296 state => {
297 return Err(anyhow!("unhandled state: {:?}", state));
298 }
299 };
300 };
301 info!(
302 job_run_id,
303 details = success_job_run.state_details,
304 elapsed = ?start_time.elapsed(),
305 "job run finish"
306 );
307 Ok(())
308 }
309
310 pub async fn compact(
311 &self,
312 catalog: String,
313 db: String,
314 table: String,
315 catalog_config: &HashMap<String, String>,
316 ) -> anyhow::Result<()> {
317 let start_result = self
318 .client
319 .start_job_run()
320 .name(format!(
321 "job-run-{}",
322 SystemTime::now()
323 .duration_since(SystemTime::UNIX_EPOCH)
324 .unwrap()
325 .as_millis()
326 ))
327 .application_id(self.config.application_id.clone())
328 .execution_role_arn(self.config.execution_role_arn.clone())
329 .job_driver(JobDriver::SparkSubmit(
330 SparkSubmitBuilder::default()
331 .entry_point(self.config.entrypoint.clone())
332 .entry_point_arguments(catalog.clone())
333 .entry_point_arguments(db)
334 .entry_point_arguments(table)
335 .spark_submit_parameters(
336 catalog_config
337 .iter()
338 .map(|(key, value)| {
339 format!("--conf spark.sql.catalog.{}.{}={}", catalog, key, value)
340 })
341 .join(" "),
342 )
343 .build()
344 .unwrap(),
345 ))
346 .send()
347 .await
348 .context("start job")?;
349 info!(job_run_id = start_result.job_run_id, "job started");
350 self.wait_job_finish(start_result.job_run_id).await
351 }
352}
353
354#[tokio::test]
355#[ignore]
356async fn trigger_compaction() {
357 tracing_subscriber::fmt().init();
358 let warehouse = "s3://iceberg-spark/iceberg/";
359 let catalog = "catalog";
360 let db = "db";
361 let table = "table";
362
363 let config = IcebergCompactionConfig::from_env().unwrap();
364 let client = IcebergCompactionClient::new(config).await;
365 let result = client
366 .compact(
367 catalog.to_owned(),
368 db.to_owned(),
369 table.to_owned(),
370 &HashMap::from_iter([
371 ("type".to_owned(), "hadoop".to_owned()),
372 ("warehouse".to_owned(), warehouse.to_owned()),
373 ]),
374 )
375 .await;
376
377 info!(?result, "job result");
378}