1use std::sync::Arc;
38use std::time::Duration;
39
40use anyhow::{Context, Result, anyhow, bail};
41use iceberg::Catalog;
42use iceberg::spec::{DataFile, FormatVersion, SerializedDataFile};
43use iceberg::table::Table;
44use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
45use prost::Message;
46use risingwave_connector::sink::catalog::SinkId;
47use risingwave_connector::sink::iceberg::commit_retry::{self, CommitError};
48use risingwave_connector::sink::iceberg::{
49 IcebergCommitResult, IcebergConfig, IcebergDvMergerCommitResult, commit_branch,
50};
51use risingwave_meta_model::pending_sink_state::SinkState;
52use risingwave_pb::connector_service::PbIcebergV3PreCommitState;
53use risingwave_pb::stream_service::PbIcebergV3SinkRole;
54use risingwave_pb::stream_service::barrier_complete_response::PbIcebergV3SinkMetadata;
55use sea_orm::DatabaseConnection;
56use serde::{Deserialize, Serialize};
57use thiserror_ext::AsReport;
58use tokio::time::timeout;
59
60use super::backfill::backfill_dv_partitions;
61use crate::manager::exactly_once_util::{
62 clean_aborted_records, commit_and_prune_epoch, list_sink_states_ordered_by_epoch,
63 persist_pre_commit_metadata,
64};
65
66const INIT_TIMEOUT: Duration = Duration::from_secs(60);
69
70#[derive(Clone)]
74struct EpochCommit {
75 epoch: u64,
76 merged: Arc<IcebergV3AggResult>,
77 snapshot_id: i64,
78}
79
80pub struct IcebergV3Coordinator {
83 sink_id: SinkId,
84 db: DatabaseConnection,
85 catalog: Arc<dyn Catalog>,
86 table: Table,
87 target_branch: String,
88 retry_num: usize,
89 waiting_commit: Option<EpochCommit>,
91 prev_committed_epoch: Option<u64>,
92}
93
94impl IcebergV3Coordinator {
95 pub async fn init(
99 sink_id: SinkId,
100 iceberg_config: IcebergConfig,
101 db: DatabaseConnection,
102 ) -> Result<Self> {
103 let (catalog, table) = timeout(INIT_TIMEOUT, load_catalog_and_table(&iceberg_config))
104 .await
105 .map_err(|_| {
106 anyhow!(
107 "iceberg v3 coordinator for sink {} timed out after {}s loading iceberg catalog/table",
108 sink_id,
109 INIT_TIMEOUT.as_secs()
110 )
111 })?
112 .with_context(|| format!("init iceberg v3 coordinator for sink {}", sink_id))?;
113
114 let (prev_committed_epoch, recovered) = recovery(&db, sink_id)
115 .await
116 .with_context(|| format!("recover pending state for iceberg v3 sink {}", sink_id))?;
117
118 let target_branch =
119 commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
120
121 let mut coordinator = Self {
122 sink_id,
123 db,
124 catalog,
125 table,
126 target_branch,
127 retry_num: iceberg_config.commit_retry_num as usize,
128 waiting_commit: None,
129 prev_committed_epoch,
130 };
131
132 for commit in recovered {
133 coordinator.waiting_commit = Some(commit);
134 coordinator
135 .commit()
136 .await
137 .with_context(|| format!("drain recovered pending epoch for sink {}", sink_id))?;
138 }
139
140 Ok(coordinator)
141 }
142
143 pub async fn pre_commit(
144 &mut self,
145 prev_epoch: u64,
146 reports: Vec<PbIcebergV3SinkMetadata>,
147 ) -> Result<()> {
148 if reports.iter().all(|r| r.metadata.is_none()) {
149 return Ok(());
150 }
151
152 let merged = aggregate_reports(&reports)?;
153 if merged.data_files.is_empty() && merged.delete_files.is_empty() {
154 bail!("v3 sink epoch {} has no data files to commit", prev_epoch);
155 }
156 let merged = Arc::new(self.backfill_dv_partitions(merged)?);
157
158 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
159 let blob = encode_pre_commit_state(&merged, snapshot_id)?;
160 persist_pre_commit_metadata(&self.db, self.sink_id, prev_epoch, Some(blob), None).await?;
161
162 self.waiting_commit = Some(EpochCommit {
163 epoch: prev_epoch,
164 merged,
165 snapshot_id,
166 });
167 Ok(())
168 }
169
170 pub async fn commit(&mut self) -> Result<()> {
171 let Some(commit) = self.waiting_commit.take() else {
172 return Ok(());
173 };
174
175 let refreshed_table = commit_one_epoch(
176 self.catalog.clone(),
177 self.table.identifier().clone(),
178 self.target_branch.clone(),
179 &commit,
180 self.retry_num,
181 )
182 .await
183 .map_err(|err| {
184 let err_report = match err {
185 CommitError::Commit(e) | CommitError::ReloadTable(e) => e,
186 };
187 anyhow!(err_report).context(format!(
188 "iceberg v3 commit failed for sink {} epoch {}",
189 self.sink_id, commit.epoch
190 ))
191 })?;
192 self.table = refreshed_table;
193
194 commit_and_prune_epoch(
195 &self.db,
196 self.sink_id,
197 commit.epoch,
198 self.prev_committed_epoch,
199 )
200 .await
201 .with_context(|| {
202 format!(
203 "iceberg v3 mark_committed failed for sink {} epoch {}",
204 self.sink_id, commit.epoch
205 )
206 })?;
207
208 self.prev_committed_epoch = Some(commit.epoch);
209 Ok(())
210 }
211
212 fn backfill_dv_partitions(&self, merged: IcebergV3AggResult) -> Result<IcebergV3AggResult> {
213 let partition_spec = self
214 .table
215 .metadata()
216 .partition_spec_by_id(merged.partition_spec_id)
217 .context("find partition spec for v3 commit")?;
218 if partition_spec.is_unpartitioned() {
219 return Ok(merged);
220 }
221
222 let schema = self.table.metadata().current_schema();
223 let partition_type = partition_spec.partition_type(schema)?;
224 let data_files = merged
225 .data_files
226 .clone()
227 .into_iter()
228 .map(|f| f.try_into(merged.partition_spec_id, &partition_type, schema))
229 .try_collect::<Vec<_>>()?;
230 let mut delete_files = merged
231 .delete_files
232 .into_iter()
233 .map(|f| f.try_into(merged.partition_spec_id, &partition_type, schema))
234 .try_collect::<Vec<_>>()?;
235 backfill_dv_partitions(&data_files, &mut delete_files)?;
236 let delete_files = delete_files
237 .into_iter()
238 .map(|f| SerializedDataFile::try_from(f, &partition_type, FormatVersion::V3))
239 .try_collect()?;
240
241 Ok(IcebergV3AggResult {
242 schema_id: merged.schema_id,
243 partition_spec_id: merged.partition_spec_id,
244 data_files: merged.data_files,
245 delete_files,
246 overwrite_files: merged.overwrite_files,
247 })
248 }
249}
250
251async fn load_catalog_and_table(
252 iceberg_config: &IcebergConfig,
253) -> Result<(Arc<dyn Catalog>, Table)> {
254 let catalog = iceberg_config
255 .create_catalog()
256 .await
257 .map_err(|e| anyhow!(e).context("create iceberg catalog for v3 sink"))?;
258 let table = iceberg_config
259 .load_table()
260 .await
261 .map_err(|e| anyhow!(e).context("load iceberg table for v3 sink"))?;
262 Ok((catalog, table))
263}
264
265async fn recovery(
267 db: &DatabaseConnection,
268 sink_id: SinkId,
269) -> Result<(Option<u64>, Vec<EpochCommit>)> {
270 fail::fail_point!("iceberg_v3_recovery_fail", |_| Err(anyhow::anyhow!(
271 "injected: iceberg_v3_recovery_fail"
272 )));
273 let rows = list_sink_states_ordered_by_epoch(db, sink_id)
274 .await
275 .context("list pending sink states for v3 recovery")?;
276
277 let mut prev_committed_epoch = None;
278 let mut pending = Vec::new();
279 let mut aborted_epochs = Vec::new();
280 for (epoch, state, metadata, _schema_change) in rows {
281 match state {
282 SinkState::Committed => {
283 prev_committed_epoch = Some(epoch);
284 }
285 SinkState::Pending => {
286 let blob = metadata.ok_or_else(|| {
287 anyhow!("v3 pending row at epoch {} missing metadata blob", epoch)
288 })?;
289 let (merged, snapshot_id) = decode_pre_commit_state(&blob)
290 .with_context(|| format!("decode v3 pre-commit state at epoch {}", epoch))?;
291 pending.push(EpochCommit {
292 epoch,
293 merged,
294 snapshot_id,
295 });
296 }
297 SinkState::Aborted => {
298 tracing::warn!(
301 sink_id = %sink_id,
302 epoch,
303 "unexpected Aborted state in v3 recovery; cleaning up",
304 );
305 aborted_epochs.push(epoch);
306 }
307 }
308 }
309 if !aborted_epochs.is_empty()
310 && let Err(e) = clean_aborted_records(db, sink_id, aborted_epochs).await
311 {
312 tracing::warn!(
314 error = %e.as_report(),
315 sink_id = %sink_id,
316 "failed to clean unexpected Aborted rows during v3 recovery",
317 );
318 }
319 Ok((prev_committed_epoch, pending))
320}
321
322async fn commit_one_epoch(
323 catalog: Arc<dyn Catalog>,
324 table_ident: iceberg::TableIdent,
325 target_branch: String,
326 commit: &EpochCommit,
327 retry_num: usize,
328) -> Result<Table, CommitError> {
329 let merged = commit.merged.clone();
330 let snapshot_id = commit.snapshot_id;
331
332 commit_retry::run_with_retry(
333 catalog.clone(),
334 table_ident,
335 merged.schema_id,
336 merged.partition_spec_id,
337 retry_num,
338 |table| {
339 let merged = merged.clone();
340 let catalog = catalog.clone();
341 let target_branch = target_branch.clone();
342 async move {
343 if table
345 .metadata()
346 .snapshots()
347 .any(|s| s.snapshot_id() == snapshot_id)
348 {
349 return Ok(table);
350 }
351
352 let schema = table.metadata().current_schema();
353 let partition_spec = table
354 .metadata()
355 .partition_spec_by_id(merged.partition_spec_id)
356 .ok_or_else(|| CommitError::Commit(anyhow!("partition spec not found")))?;
357 let partition_type = partition_spec
358 .partition_type(schema)
359 .map_err(|e| CommitError::Commit(anyhow!(e)))?;
360
361 let mut add_files: Vec<DataFile> = Vec::new();
362 let mut overwrite_files: Vec<DataFile> = Vec::new();
363 for serialized in merged.data_files.iter().chain(merged.delete_files.iter()) {
364 let f = serialized
365 .clone()
366 .try_into(merged.partition_spec_id, &partition_type, schema)
367 .map_err(|err| {
368 CommitError::Commit(
369 anyhow!(err).context("materialize v3 SerializedDataFile"),
370 )
371 })?;
372 add_files.push(f);
373 }
374 for serialized in &merged.overwrite_files {
375 let f = serialized
376 .clone()
377 .try_into(merged.partition_spec_id, &partition_type, schema)
378 .map_err(|err| {
379 CommitError::Commit(
380 anyhow!(err).context("materialize v3 SerializedDataFile"),
381 )
382 })?;
383 overwrite_files.push(f);
384 }
385
386 let txn = Transaction::new(&table);
387 let action = txn
388 .overwrite_files()
389 .set_snapshot_id(snapshot_id)
390 .set_target_branch(target_branch)
391 .add_data_files(add_files)
392 .delete_files(overwrite_files);
393 let txn = action.apply(txn).map_err(|err| {
394 CommitError::Commit(
395 anyhow!(err).context("apply iceberg v3 overwrite_files action"),
396 )
397 })?;
398 let table = txn.commit(catalog.as_ref()).await.map_err(|err| {
399 CommitError::Commit(anyhow!(err).context("commit iceberg v3 transaction"))
400 })?;
401 Ok(table)
402 }
403 },
404 )
405 .await
406 .map_err(CommitError::Commit)
407}
408
409#[derive(Clone, Serialize, Deserialize)]
410struct IcebergV3AggResult {
411 schema_id: i32,
412 partition_spec_id: i32,
413 data_files: Vec<SerializedDataFile>,
414 delete_files: Vec<SerializedDataFile>,
415 overwrite_files: Vec<SerializedDataFile>,
416}
417
418fn aggregate_reports(reports: &[PbIcebergV3SinkMetadata]) -> Result<IcebergV3AggResult> {
419 let mut shared_schema_id: Option<i32> = None;
420 let mut shared_partition_spec_id: Option<i32> = None;
421
422 let mut data_files: Vec<SerializedDataFile> = Vec::new();
423 let mut delete_files: Vec<SerializedDataFile> = Vec::new();
424 let mut overwrite_files: Vec<SerializedDataFile> = Vec::new();
425
426 if reports.is_empty() {
427 bail!("no reports to aggregate for iceberg v3 coordinator");
428 }
429
430 for r in reports {
431 let Some(meta) = &r.metadata else {
432 bail!("iceberg v3 sink report missing metadata in aggregate_reports");
433 };
434
435 let role = PbIcebergV3SinkRole::try_from(r.role)
437 .ok()
438 .filter(|r| !matches!(r, PbIcebergV3SinkRole::Unspecified))
439 .ok_or_else(|| anyhow!("iceberg v3 sink report has invalid role: {}", r.role))?;
440
441 match role {
442 PbIcebergV3SinkRole::Writer => {
443 let commit_result = IcebergCommitResult::try_from(meta)?;
444 align_report_id(
445 commit_result.schema_id,
446 commit_result.partition_spec_id,
447 &mut shared_schema_id,
448 &mut shared_partition_spec_id,
449 )?;
450 data_files.extend(commit_result.data_files);
451 }
452 PbIcebergV3SinkRole::DvMerger => {
453 let commit_result = IcebergDvMergerCommitResult::try_from(meta)
454 .map_err(|e| anyhow!(e).context("decode v3 dv merger metadata"))?;
455 align_report_id(
456 commit_result.schema_id,
457 commit_result.partition_spec_id,
458 &mut shared_schema_id,
459 &mut shared_partition_spec_id,
460 )?;
461 delete_files.extend(commit_result.delete_files);
462 overwrite_files.extend(commit_result.overwrite_files);
463 }
464 _ => unreachable!(),
465 }
466 }
467
468 Ok(IcebergV3AggResult {
469 schema_id: shared_schema_id.unwrap(),
470 partition_spec_id: shared_partition_spec_id.unwrap(),
471 data_files,
472 delete_files,
473 overwrite_files,
474 })
475}
476
477fn align_report_id(
478 schema_id: i32,
479 partition_spec_id: i32,
480 shared_schema_id: &mut Option<i32>,
481 shared_partition_spec_id: &mut Option<i32>,
482) -> Result<()> {
483 match shared_schema_id {
484 Some(prev) if *prev != schema_id => {
485 bail!(
486 "iceberg v3 sink reports disagree on schema_id: {} vs {}",
487 prev,
488 schema_id
489 );
490 }
491 None => *shared_schema_id = Some(schema_id),
492 _ => {}
493 }
494 match shared_partition_spec_id {
495 Some(prev) if *prev != partition_spec_id => {
496 bail!(
497 "iceberg v3 sink reports disagree on partition_spec_id: {} vs {}",
498 prev,
499 partition_spec_id
500 );
501 }
502 None => *shared_partition_spec_id = Some(partition_spec_id),
503 _ => {}
504 }
505 Ok(())
506}
507
508fn encode_pre_commit_state(agg_result: &IcebergV3AggResult, snapshot_id: i64) -> Result<Vec<u8>> {
509 let agg_result = serde_json::to_vec(agg_result)?;
510 Ok(PbIcebergV3PreCommitState {
511 agg_result,
512 snapshot_id,
513 }
514 .encode_to_vec())
515}
516
517fn decode_pre_commit_state(blob: &[u8]) -> Result<(Arc<IcebergV3AggResult>, i64)> {
518 let state = PbIcebergV3PreCommitState::decode(blob)?;
519 let agg_result = Arc::new(serde_json::from_slice(&state.agg_result)?);
520 Ok((agg_result, state.snapshot_id))
521}