Skip to main content

risingwave_connector/sink/iceberg/
commit_retry.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared iceberg commit-with-retry primitive used by both the V1/V2 sink
16//! committer and the V3 sink coordinator worker. Wraps the standard pattern
17//! of "reload the table, build an action against the freshly-loaded snapshot,
18//! commit, retry on transient errors only".
19
20use std::future::Future;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Result, anyhow, bail};
25use iceberg::table::Table;
26use iceberg::{Catalog, TableIdent};
27use thiserror_ext::AsReport;
28use tokio_retry::RetryIf;
29use tokio_retry::strategy::{ExponentialBackoff, jitter};
30
31/// Distinguishes retriable from non-retriable errors inside [`run_with_retry`].
32pub enum CommitError {
33    /// `reload_table` failed (table not found, schema mismatch, partition
34    /// evolution). Non-retriable — the call site's invariants no longer hold.
35    ReloadTable(anyhow::Error),
36    /// `Transaction::commit` (or its `apply`) failed. Retriable — likely a
37    /// commit conflict or transient network error.
38    Commit(anyhow::Error),
39}
40
41/// Reload the iceberg table from the catalog and assert that its current
42/// `schema_id` and `default_partition_spec_id` still match the values the
43/// caller computed against. Schema or partition evolution mid-commit is
44/// surfaced as a non-retriable error by the call sites.
45pub async fn reload_table(
46    catalog: &dyn Catalog,
47    table_ident: &TableIdent,
48    schema_id: i32,
49    partition_spec_id: i32,
50) -> Result<Table> {
51    let table = catalog
52        .load_table(table_ident)
53        .await
54        .map_err(|e| anyhow!(e).context("reload iceberg table"))?;
55    if table.metadata().current_schema_id() != schema_id {
56        bail!(
57            "iceberg sink: schema evolution not supported; expect schema id {}, got {}",
58            schema_id,
59            table.metadata().current_schema_id(),
60        );
61    }
62    if table.metadata().default_partition_spec_id() != partition_spec_id {
63        bail!(
64            "iceberg sink: partition evolution not supported; expect partition spec id {}, got {}",
65            partition_spec_id,
66            table.metadata().default_partition_spec_id(),
67        );
68    }
69    Ok(table)
70}
71
72/// Run a commit-action against the given iceberg table with retry.
73/// 1. Calls `reload_table` before each commit attempt to get the latest metadata
74/// 2. If `reload_table` fails (table not exists/schema/partition mismatch), stops retrying immediately
75/// 3. If commit fails, retries with backoff up to `retry_num` times.
76///
77/// Strategy: exponential backoff 10ms→60s with jitter, up to `retry_num` retries.
78pub async fn run_with_retry<F, Fut, Out>(
79    catalog: Arc<dyn Catalog>,
80    table_ident: TableIdent,
81    schema_id: i32,
82    partition_spec_id: i32,
83    retry_num: usize,
84    commit_action: F,
85) -> Result<Out>
86where
87    F: Fn(Table) -> Fut + Send + Sync,
88    Fut: Future<Output = Result<Out, CommitError>> + Send,
89{
90    let retry_strategy = ExponentialBackoff::from_millis(10)
91        .max_delay(Duration::from_secs(60))
92        .map(jitter)
93        .take(retry_num);
94
95    RetryIf::spawn(
96        retry_strategy,
97        || {
98            let catalog = catalog.clone();
99            let table_ident = table_ident.clone();
100            let commit_action = &commit_action;
101            async move {
102                let table =
103                    reload_table(catalog.as_ref(), &table_ident, schema_id, partition_spec_id)
104                        .await
105                        .map_err(CommitError::ReloadTable)?;
106                commit_action(table).await
107            }
108        },
109        |err: &CommitError| match err {
110            CommitError::Commit(e) => {
111                tracing::warn!(
112                    error = %e.as_report(),
113                    "iceberg commit failed; will retry",
114                );
115                true
116            }
117            CommitError::ReloadTable(e) => {
118                tracing::error!(
119                    error = %e.as_report(),
120                    "iceberg reload_table failed; will not retry",
121                );
122                false
123            }
124        },
125    )
126    .await
127    .map_err(|e| match e {
128        CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
129    })
130}