Skip to main content

risingwave_meta/manager/iceberg_v3_sink/
coordinator.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-sink Iceberg V3 commit coordinator. This is a plain struct (no background task / mpsc): the
16//! [`crate::manager::iceberg_v3_sink::IcebergV3SinkManager`] holds one per registered sink behind a
17//! per-sink async mutex and calls [`IcebergV3Coordinator::pre_commit`] / [`IcebergV3Coordinator::commit`]
18//! directly from the barrier-completion path. The barrier path already serializes pre-commit/commit per
19//! epoch, so the coordinator never needs its own request queue.
20//!
21//! [`IcebergV3Coordinator::init`] is synchronous with respect to registration: it loads the iceberg
22//! catalog/table, reads `pending_sink_state`, and drains any recovered pending epoch via an iceberg
23//! `overwrite_files` transaction BEFORE returning a ready coordinator. Only once init completes does the
24//! sink start accepting live pre-commit/commit calls.
25//!
26//! The two phases dispatched from `complete_barrier`:
27//!
28//! 1. `pre_commit` — aggregate the reports, generate a `snapshot_id`, persist the merged file list under
29//!    `pending_sink_state`, return. No iceberg I/O.
30//! 2. `commit` — run an iceberg `overwrite_files` transaction (keyed on the pre-generated `snapshot_id`
31//!    for idempotency), then mark the row Committed and prune the prior epoch's row.
32//!
33//! On retry-exhausted commit failure the error propagates to the caller; the barrier then fails and the
34//! meta-recovery path drops the coordinator, re-registers it (re-running `init`), and retries from the
35//! persisted `pending_sink_state` rows.
36
37use std::sync::Arc;
38use std::time::Duration;
39
40use anyhow::{Context, Result, anyhow, bail};
41use iceberg::Catalog;
42use iceberg::spec::{DataFile, FormatVersion, SerializedDataFile};
43use iceberg::table::Table;
44use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
45use prost::Message;
46use risingwave_connector::sink::catalog::SinkId;
47use risingwave_connector::sink::iceberg::commit_retry::{self, CommitError};
48use risingwave_connector::sink::iceberg::{
49    IcebergCommitResult, IcebergConfig, IcebergDvMergerCommitResult, commit_branch,
50};
51use risingwave_meta_model::pending_sink_state::SinkState;
52use risingwave_pb::connector_service::PbIcebergV3PreCommitState;
53use risingwave_pb::stream_service::PbIcebergV3SinkRole;
54use risingwave_pb::stream_service::barrier_complete_response::PbIcebergV3SinkMetadata;
55use sea_orm::DatabaseConnection;
56use serde::{Deserialize, Serialize};
57use thiserror_ext::AsReport;
58use tokio::time::timeout;
59
60use super::backfill::backfill_dv_partitions;
61use crate::manager::exactly_once_util::{
62    clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
63    persist_pre_commit_metadata,
64};
65
66/// Bound the init phase so a register call can't hang forever if the iceberg endpoint is unreachable;
67/// on timeout `init` returns an error and registration fails (the caller surfaces it / retries).
68const INIT_TIMEOUT: Duration = Duration::from_secs(60);
69
70/// One epoch's worth of pre-committed state queued inside the coordinator. Holds the decoded merged file
71/// list and the pre-generated `snapshot_id`. The blob form ([`PbIcebergV3PreCommitState`]) is only
72/// materialized when persisting to `pending_sink_state`; in-memory we keep the structured form.
73#[derive(Clone)]
74struct EpochCommit {
75    epoch: u64,
76    merged: Arc<IcebergV3AggResult>,
77    snapshot_id: i64,
78}
79
80/// Per-sink Iceberg V3 commit coordinator. Owns the loaded iceberg catalog/table (reused across commits)
81/// and the meta SQL connection (used for `pending_sink_state` exactly-once persistence).
82pub struct IcebergV3Coordinator {
83    sink_id: SinkId,
84    db: DatabaseConnection,
85    catalog: Arc<dyn Catalog>,
86    table: Table,
87    target_branch: String,
88    retry_num: usize,
89    /// The epoch pre-committed but not yet committed, carried from `pre_commit` to the next `commit`.
90    waiting_commit: Option<EpochCommit>,
91    prev_committed_epoch: Option<u64>,
92}
93
94impl IcebergV3Coordinator {
95    /// Build a ready-to-serve coordinator: load the iceberg catalog/table, recover any persisted pending
96    /// state, and drain recovered pending epochs to iceberg. Returns only once recovery is complete, so a
97    /// successful return means the sink is ready to accept live pre-commit/commit calls.
98    pub async fn init(
99        sink_id: SinkId,
100        iceberg_config: IcebergConfig,
101        db: DatabaseConnection,
102    ) -> Result<Self> {
103        let (catalog, table) = timeout(INIT_TIMEOUT, load_catalog_and_table(&iceberg_config))
104            .await
105            .map_err(|_| {
106                anyhow!(
107                    "iceberg v3 coordinator for sink {} timed out after {}s loading iceberg catalog/table",
108                    sink_id,
109                    INIT_TIMEOUT.as_secs()
110                )
111            })?
112            .with_context(|| format!("init iceberg v3 coordinator for sink {}", sink_id))?;
113
114        let (prev_committed_epoch, recovered) = recovery(&db, sink_id)
115            .await
116            .with_context(|| format!("recover pending state for iceberg v3 sink {}", sink_id))?;
117
118        let target_branch =
119            commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
120
121        let mut coordinator = Self {
122            sink_id,
123            db,
124            catalog,
125            table,
126            target_branch,
127            retry_num: iceberg_config.commit_retry_num as usize,
128            waiting_commit: None,
129            prev_committed_epoch,
130        };
131
132        for commit in recovered {
133            coordinator.waiting_commit = Some(commit);
134            coordinator
135                .commit()
136                .await
137                .with_context(|| format!("drain recovered pending epoch for sink {}", sink_id))?;
138        }
139
140        Ok(coordinator)
141    }
142
143    pub async fn pre_commit(
144        &mut self,
145        prev_epoch: u64,
146        reports: Vec<PbIcebergV3SinkMetadata>,
147    ) -> Result<()> {
148        if reports.iter().all(|r| r.metadata.is_none()) {
149            return Ok(());
150        }
151
152        let merged = aggregate_reports(&reports)?;
153        if merged.data_files.is_empty() && merged.delete_files.is_empty() {
154            bail!("v3 sink epoch {} has no data files to commit", prev_epoch);
155        }
156        let merged = Arc::new(self.backfill_dv_partitions(merged)?);
157
158        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
159        let blob = encode_pre_commit_state(&merged, snapshot_id)?;
160        persist_pre_commit_metadata(&self.db, self.sink_id, prev_epoch, Some(blob), None).await?;
161
162        self.waiting_commit = Some(EpochCommit {
163            epoch: prev_epoch,
164            merged,
165            snapshot_id,
166        });
167        Ok(())
168    }
169
170    pub async fn commit(&mut self) -> Result<()> {
171        let Some(commit) = self.waiting_commit.take() else {
172            return Ok(());
173        };
174
175        let refreshed_table = commit_one_epoch(
176            self.catalog.clone(),
177            self.table.identifier().clone(),
178            self.target_branch.clone(),
179            &commit,
180            self.retry_num,
181        )
182        .await
183        .map_err(|err| {
184            let err_report = match err {
185                CommitError::Commit(e) | CommitError::ReloadTable(e) => e,
186            };
187            anyhow!(err_report).context(format!(
188                "iceberg v3 commit failed for sink {} epoch {}",
189                self.sink_id, commit.epoch
190            ))
191        })?;
192        self.table = refreshed_table;
193
194        commit_and_prune_epoch(
195            &self.db,
196            self.sink_id,
197            commit.epoch,
198            self.prev_committed_epoch,
199        )
200        .await
201        .with_context(|| {
202            format!(
203                "iceberg v3 mark_committed failed for sink {} epoch {}",
204                self.sink_id, commit.epoch
205            )
206        })?;
207
208        self.prev_committed_epoch = Some(commit.epoch);
209        Ok(())
210    }
211
212    fn backfill_dv_partitions(&self, merged: IcebergV3AggResult) -> Result<IcebergV3AggResult> {
213        let partition_spec = self
214            .table
215            .metadata()
216            .partition_spec_by_id(merged.partition_spec_id)
217            .context("find partition spec for v3 commit")?;
218        if partition_spec.is_unpartitioned() {
219            return Ok(merged);
220        }
221
222        let schema = self.table.metadata().current_schema();
223        let partition_type = partition_spec.partition_type(schema)?;
224        let data_files = merged
225            .data_files
226            .clone()
227            .into_iter()
228            .map(|f| f.try_into(merged.partition_spec_id, &partition_type, schema))
229            .try_collect::<Vec<_>>()?;
230        let mut delete_files = merged
231            .delete_files
232            .into_iter()
233            .map(|f| f.try_into(merged.partition_spec_id, &partition_type, schema))
234            .try_collect::<Vec<_>>()?;
235        backfill_dv_partitions(&data_files, &mut delete_files)?;
236        let delete_files = delete_files
237            .into_iter()
238            .map(|f| SerializedDataFile::try_from(f, &partition_type, FormatVersion::V3))
239            .try_collect()?;
240
241        Ok(IcebergV3AggResult {
242            schema_id: merged.schema_id,
243            partition_spec_id: merged.partition_spec_id,
244            data_files: merged.data_files,
245            delete_files,
246            overwrite_files: merged.overwrite_files,
247        })
248    }
249}
250
251async fn load_catalog_and_table(
252    iceberg_config: &IcebergConfig,
253) -> Result<(Arc<dyn Catalog>, Table)> {
254    let catalog = iceberg_config
255        .create_catalog()
256        .await
257        .map_err(|e| anyhow!(e).context("create iceberg catalog for v3 sink"))?;
258    let table = iceberg_config
259        .load_table()
260        .await
261        .map_err(|e| anyhow!(e).context("load iceberg table for v3 sink"))?;
262    Ok((catalog, table))
263}
264
265/// Read every persisted row for this sink, recovering `prev_committed_epoch` and pending commits.
266async fn recovery(
267    db: &DatabaseConnection,
268    sink_id: SinkId,
269) -> Result<(Option<u64>, Vec<EpochCommit>)> {
270    fail::fail_point!("iceberg_v3_recovery_fail", |_| Err(anyhow::anyhow!(
271        "injected: iceberg_v3_recovery_fail"
272    )));
273    let rows = list_sink_states_ordered_by_epoch(db, sink_id)
274        .await
275        .context("list pending sink states for v3 recovery")?;
276
277    let mut prev_committed_epoch = None;
278    let mut pending = Vec::new();
279    let mut aborted_epochs = Vec::new();
280    for (epoch, state, metadata, _schema_change) in rows {
281        match state {
282            SinkState::Committed => {
283                prev_committed_epoch = Some(epoch);
284            }
285            SinkState::Pending => {
286                let blob = metadata.ok_or_else(|| {
287                    anyhow!("v3 pending row at epoch {} missing metadata blob", epoch)
288                })?;
289                let (merged, snapshot_id) = decode_pre_commit_state(&blob)
290                    .with_context(|| format!("decode v3 pre-commit state at epoch {}", epoch))?;
291                pending.push(EpochCommit {
292                    epoch,
293                    merged,
294                    snapshot_id,
295                });
296            }
297            SinkState::Aborted => {
298                // V3 doesn't produce Aborted rows; tolerate them defensively and drop them so they don't
299                // accumulate across restarts.
300                tracing::warn!(
301                    sink_id = %sink_id,
302                    epoch,
303                    "unexpected Aborted state in v3 recovery; cleaning up",
304                );
305                aborted_epochs.push(epoch);
306            }
307        }
308    }
309    if !aborted_epochs.is_empty()
310        && let Err(e) = clean_aborted_records(db, sink_id, aborted_epochs).await
311    {
312        // Best-effort cleanup; defer to next recovery if the DB rejects.
313        tracing::warn!(
314            error = %e.as_report(),
315            sink_id = %sink_id,
316            "failed to clean unexpected Aborted rows during v3 recovery",
317        );
318    }
319    Ok((prev_committed_epoch, pending))
320}
321
322async fn commit_one_epoch(
323    catalog: Arc<dyn Catalog>,
324    table_ident: iceberg::TableIdent,
325    target_branch: String,
326    commit: &EpochCommit,
327    retry_num: usize,
328) -> Result<Table, CommitError> {
329    let merged = commit.merged.clone();
330    let snapshot_id = commit.snapshot_id;
331
332    commit_retry::run_with_retry(
333        catalog.clone(),
334        table_ident,
335        merged.schema_id,
336        merged.partition_spec_id,
337        retry_num,
338        |table| {
339            let merged = merged.clone();
340            let catalog = catalog.clone();
341            let target_branch = target_branch.clone();
342            async move {
343                // Idempotency: if iceberg already saw this `snapshot_id`, skip the overwrite_files transaction.
344                if table
345                    .metadata()
346                    .snapshots()
347                    .any(|s| s.snapshot_id() == snapshot_id)
348                {
349                    return Ok(table);
350                }
351
352                let schema = table.metadata().current_schema();
353                let partition_spec = table
354                    .metadata()
355                    .partition_spec_by_id(merged.partition_spec_id)
356                    .ok_or_else(|| CommitError::Commit(anyhow!("partition spec not found")))?;
357                let partition_type = partition_spec
358                    .partition_type(schema)
359                    .map_err(|e| CommitError::Commit(anyhow!(e)))?;
360
361                let mut add_files: Vec<DataFile> = Vec::new();
362                let mut overwrite_files: Vec<DataFile> = Vec::new();
363                for serialized in merged.data_files.iter().chain(merged.delete_files.iter()) {
364                    let f = serialized
365                        .clone()
366                        .try_into(merged.partition_spec_id, &partition_type, schema)
367                        .map_err(|err| {
368                            CommitError::Commit(
369                                anyhow!(err).context("materialize v3 SerializedDataFile"),
370                            )
371                        })?;
372                    add_files.push(f);
373                }
374                for serialized in &merged.overwrite_files {
375                    let f = serialized
376                        .clone()
377                        .try_into(merged.partition_spec_id, &partition_type, schema)
378                        .map_err(|err| {
379                            CommitError::Commit(
380                                anyhow!(err).context("materialize v3 SerializedDataFile"),
381                            )
382                        })?;
383                    overwrite_files.push(f);
384                }
385
386                let txn = Transaction::new(&table);
387                let action = txn
388                    .overwrite_files()
389                    .set_snapshot_id(snapshot_id)
390                    .set_target_branch(target_branch)
391                    .add_data_files(add_files)
392                    .delete_files(overwrite_files);
393                let txn = action.apply(txn).map_err(|err| {
394                    CommitError::Commit(
395                        anyhow!(err).context("apply iceberg v3 overwrite_files action"),
396                    )
397                })?;
398                let table = txn.commit(catalog.as_ref()).await.map_err(|err| {
399                    CommitError::Commit(anyhow!(err).context("commit iceberg v3 transaction"))
400                })?;
401                Ok(table)
402            }
403        },
404    )
405    .await
406    .map_err(CommitError::Commit)
407}
408
409#[derive(Clone, Serialize, Deserialize)]
410struct IcebergV3AggResult {
411    schema_id: i32,
412    partition_spec_id: i32,
413    data_files: Vec<SerializedDataFile>,
414    delete_files: Vec<SerializedDataFile>,
415    overwrite_files: Vec<SerializedDataFile>,
416}
417
418fn aggregate_reports(reports: &[PbIcebergV3SinkMetadata]) -> Result<IcebergV3AggResult> {
419    let mut shared_schema_id: Option<i32> = None;
420    let mut shared_partition_spec_id: Option<i32> = None;
421
422    let mut data_files: Vec<SerializedDataFile> = Vec::new();
423    let mut delete_files: Vec<SerializedDataFile> = Vec::new();
424    let mut overwrite_files: Vec<SerializedDataFile> = Vec::new();
425
426    if reports.is_empty() {
427        bail!("no reports to aggregate for iceberg v3 coordinator");
428    }
429
430    for r in reports {
431        let Some(meta) = &r.metadata else {
432            bail!("iceberg v3 sink report missing metadata in aggregate_reports");
433        };
434
435        // Validate role: explicitly-Unspecified is a wire-format bug.
436        let role = PbIcebergV3SinkRole::try_from(r.role)
437            .ok()
438            .filter(|r| !matches!(r, PbIcebergV3SinkRole::Unspecified))
439            .ok_or_else(|| anyhow!("iceberg v3 sink report has invalid role: {}", r.role))?;
440
441        match role {
442            PbIcebergV3SinkRole::Writer => {
443                let commit_result = IcebergCommitResult::try_from(meta)?;
444                align_report_id(
445                    commit_result.schema_id,
446                    commit_result.partition_spec_id,
447                    &mut shared_schema_id,
448                    &mut shared_partition_spec_id,
449                )?;
450                data_files.extend(commit_result.data_files);
451            }
452            PbIcebergV3SinkRole::DvMerger => {
453                let commit_result = IcebergDvMergerCommitResult::try_from(meta)
454                    .map_err(|e| anyhow!(e).context("decode v3 dv merger metadata"))?;
455                align_report_id(
456                    commit_result.schema_id,
457                    commit_result.partition_spec_id,
458                    &mut shared_schema_id,
459                    &mut shared_partition_spec_id,
460                )?;
461                delete_files.extend(commit_result.delete_files);
462                overwrite_files.extend(commit_result.overwrite_files);
463            }
464            _ => unreachable!(),
465        }
466    }
467
468    Ok(IcebergV3AggResult {
469        schema_id: shared_schema_id.unwrap(),
470        partition_spec_id: shared_partition_spec_id.unwrap(),
471        data_files,
472        delete_files,
473        overwrite_files,
474    })
475}
476
477fn align_report_id(
478    schema_id: i32,
479    partition_spec_id: i32,
480    shared_schema_id: &mut Option<i32>,
481    shared_partition_spec_id: &mut Option<i32>,
482) -> Result<()> {
483    match shared_schema_id {
484        Some(prev) if *prev != schema_id => {
485            bail!(
486                "iceberg v3 sink reports disagree on schema_id: {} vs {}",
487                prev,
488                schema_id
489            );
490        }
491        None => *shared_schema_id = Some(schema_id),
492        _ => {}
493    }
494    match shared_partition_spec_id {
495        Some(prev) if *prev != partition_spec_id => {
496            bail!(
497                "iceberg v3 sink reports disagree on partition_spec_id: {} vs {}",
498                prev,
499                partition_spec_id
500            );
501        }
502        None => *shared_partition_spec_id = Some(partition_spec_id),
503        _ => {}
504    }
505    Ok(())
506}
507
508fn encode_pre_commit_state(agg_result: &IcebergV3AggResult, snapshot_id: i64) -> Result<Vec<u8>> {
509    let agg_result = serde_json::to_vec(agg_result)?;
510    Ok(PbIcebergV3PreCommitState {
511        agg_result,
512        snapshot_id,
513    }
514    .encode_to_vec())
515}
516
517fn decode_pre_commit_state(blob: &[u8]) -> Result<(Arc<IcebergV3AggResult>, i64)> {
518    let state = PbIcebergV3PreCommitState::decode(blob)?;
519    let agg_result = Arc::new(serde_json::from_slice(&state.agg_result)?);
520    Ok((agg_result, state.snapshot_id))
521}