risingwave_connector/sink/iceberg/commit_retry.rs
1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared iceberg commit-with-retry primitive used by both the V1/V2 sink
16//! committer and the V3 sink coordinator worker. Wraps the standard pattern
17//! of "reload the table, build an action against the freshly-loaded snapshot,
18//! commit, retry on transient errors only".
19
20use std::future::Future;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Result, anyhow, bail};
25use iceberg::table::Table;
26use iceberg::{Catalog, TableIdent};
27use thiserror_ext::AsReport;
28use tokio_retry::RetryIf;
29use tokio_retry::strategy::{ExponentialBackoff, jitter};
30
31/// Distinguishes retriable from non-retriable errors inside [`run_with_retry`].
32pub enum CommitError {
33 /// `reload_table` failed (table not found, schema mismatch, partition
34 /// evolution). Non-retriable — the call site's invariants no longer hold.
35 ReloadTable(anyhow::Error),
36 /// `Transaction::commit` (or its `apply`) failed. Retriable — likely a
37 /// commit conflict or transient network error.
38 Commit(anyhow::Error),
39}
40
41/// Reload the iceberg table from the catalog and assert that its current
42/// `schema_id` and `default_partition_spec_id` still match the values the
43/// caller computed against. Schema or partition evolution mid-commit is
44/// surfaced as a non-retriable error by the call sites.
45pub async fn reload_table(
46 catalog: &dyn Catalog,
47 table_ident: &TableIdent,
48 schema_id: i32,
49 partition_spec_id: i32,
50) -> Result<Table> {
51 let table = catalog
52 .load_table(table_ident)
53 .await
54 .map_err(|e| anyhow!(e).context("reload iceberg table"))?;
55 if table.metadata().current_schema_id() != schema_id {
56 bail!(
57 "iceberg sink: schema evolution not supported; expect schema id {}, got {}",
58 schema_id,
59 table.metadata().current_schema_id(),
60 );
61 }
62 if table.metadata().default_partition_spec_id() != partition_spec_id {
63 bail!(
64 "iceberg sink: partition evolution not supported; expect partition spec id {}, got {}",
65 partition_spec_id,
66 table.metadata().default_partition_spec_id(),
67 );
68 }
69 Ok(table)
70}
71
72/// Run a commit-action against the given iceberg table with retry.
73/// 1. Calls `reload_table` before each commit attempt to get the latest metadata
74/// 2. If `reload_table` fails (table not exists/schema/partition mismatch), stops retrying immediately
75/// 3. If commit fails, retries with backoff up to `retry_num` times.
76///
77/// Strategy: exponential backoff 10ms→60s with jitter, up to `retry_num` retries.
78pub async fn run_with_retry<F, Fut, Out>(
79 catalog: Arc<dyn Catalog>,
80 table_ident: TableIdent,
81 schema_id: i32,
82 partition_spec_id: i32,
83 retry_num: usize,
84 commit_action: F,
85) -> Result<Out>
86where
87 F: Fn(Table) -> Fut + Send + Sync,
88 Fut: Future<Output = Result<Out, CommitError>> + Send,
89{
90 let retry_strategy = ExponentialBackoff::from_millis(10)
91 .max_delay(Duration::from_secs(60))
92 .map(jitter)
93 .take(retry_num);
94
95 RetryIf::spawn(
96 retry_strategy,
97 || {
98 let catalog = catalog.clone();
99 let table_ident = table_ident.clone();
100 let commit_action = &commit_action;
101 async move {
102 let table =
103 reload_table(catalog.as_ref(), &table_ident, schema_id, partition_spec_id)
104 .await
105 .map_err(CommitError::ReloadTable)?;
106 commit_action(table).await
107 }
108 },
109 |err: &CommitError| match err {
110 CommitError::Commit(e) => {
111 tracing::warn!(
112 error = %e.as_report(),
113 "iceberg commit failed; will retry",
114 );
115 true
116 }
117 CommitError::ReloadTable(e) => {
118 tracing::error!(
119 error = %e.as_report(),
120 "iceberg reload_table failed; will not retry",
121 );
122 false
123 }
124 },
125 )
126 .await
127 .map_err(|e| match e {
128 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
129 })
130}