risingwave_connector/sink/iceberg/
commit.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{DataFile, Operation, SerializedDataFile, TableMetadata};
22use iceberg::table::Table;
23use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
24use iceberg::{Catalog, TableIdent};
25use itertools::Itertools;
26use risingwave_common::array::arrow::arrow_schema_iceberg::{
27    DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
28};
29use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
30use risingwave_common::bail;
31use risingwave_common::catalog::Field;
32use risingwave_common::error::IcebergError;
33use risingwave_pb::connector_service::SinkMetadata;
34use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
35use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37use serde_json::from_value;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::UnboundedSender;
40use tokio_retry::RetryIf;
41use tokio_retry::strategy::{ExponentialBackoff, jitter};
42use tracing::warn;
43
44use super::{GLOBAL_SINK_METRICS, IcebergConfig, SinkError, commit_branch};
45use crate::connector_common::{IcebergCommittedSnapshot, IcebergSinkCompactionUpdate};
46use crate::sink::catalog::SinkId;
47use crate::sink::{Result, SinglePhaseCommitCoordinator, SinkParam, TwoPhaseCommitCoordinator};
48
49const SCHEMA_ID: &str = "schema_id";
50const PARTITION_SPEC_ID: &str = "partition_spec_id";
51const DATA_FILES: &str = "data_files";
52
53#[derive(Default, Clone)]
54pub struct IcebergCommitResult {
55    pub schema_id: i32,
56    pub partition_spec_id: i32,
57    pub data_files: Vec<SerializedDataFile>,
58}
59
60impl IcebergCommitResult {
61    fn try_from(value: &SinkMetadata) -> Result<Self> {
62        if let Some(Serialized(v)) = &value.metadata {
63            let mut values = if let serde_json::Value::Object(v) =
64                serde_json::from_slice::<serde_json::Value>(&v.metadata)
65                    .context("Can't parse iceberg sink metadata")?
66            {
67                v
68            } else {
69                bail!("iceberg sink metadata should be an object");
70            };
71
72            let schema_id;
73            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
74                schema_id = value
75                    .as_u64()
76                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
77            } else {
78                bail!("iceberg sink metadata should have schema_id");
79            }
80
81            let partition_spec_id;
82            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
83                partition_spec_id = value
84                    .as_u64()
85                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
86            } else {
87                bail!("iceberg sink metadata should have partition_spec_id");
88            }
89
90            let data_files: Vec<SerializedDataFile>;
91            if let serde_json::Value::Array(values) = values
92                .remove(DATA_FILES)
93                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
94            {
95                data_files = values
96                    .into_iter()
97                    .map(from_value::<SerializedDataFile>)
98                    .collect::<std::result::Result<_, _>>()
99                    .unwrap();
100            } else {
101                bail!("iceberg sink metadata should have data_files object");
102            }
103
104            Ok(Self {
105                schema_id: schema_id as i32,
106                partition_spec_id: partition_spec_id as i32,
107                data_files,
108            })
109        } else {
110            bail!("Can't create iceberg sink write result from empty data!")
111        }
112    }
113
114    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
115        let mut values = if let serde_json::Value::Object(value) =
116            serde_json::from_slice::<serde_json::Value>(&value)
117                .context("Can't parse iceberg sink metadata")?
118        {
119            value
120        } else {
121            bail!("iceberg sink metadata should be an object");
122        };
123
124        let schema_id;
125        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
126            schema_id = value
127                .as_u64()
128                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
129        } else {
130            bail!("iceberg sink metadata should have schema_id");
131        }
132
133        let partition_spec_id;
134        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
135            partition_spec_id = value
136                .as_u64()
137                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
138        } else {
139            bail!("iceberg sink metadata should have partition_spec_id");
140        }
141
142        let data_files: Vec<SerializedDataFile>;
143        if let serde_json::Value::Array(values) = values
144            .remove(DATA_FILES)
145            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
146        {
147            data_files = values
148                .into_iter()
149                .map(from_value::<SerializedDataFile>)
150                .collect::<std::result::Result<_, _>>()
151                .unwrap();
152        } else {
153            bail!("iceberg sink metadata should have data_files object");
154        }
155
156        Ok(Self {
157            schema_id: schema_id as i32,
158            partition_spec_id: partition_spec_id as i32,
159            data_files,
160        })
161    }
162}
163
164impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
165    type Error = SinkError;
166
167    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
168        let json_data_files = serde_json::Value::Array(
169            value
170                .data_files
171                .iter()
172                .map(serde_json::to_value)
173                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
174                .context("Can't serialize data files to json")?,
175        );
176        let json_value = serde_json::Value::Object(
177            vec![
178                (
179                    SCHEMA_ID.to_owned(),
180                    serde_json::Value::Number(value.schema_id.into()),
181                ),
182                (
183                    PARTITION_SPEC_ID.to_owned(),
184                    serde_json::Value::Number(value.partition_spec_id.into()),
185                ),
186                (DATA_FILES.to_owned(), json_data_files),
187            ]
188            .into_iter()
189            .collect(),
190        );
191        Ok(SinkMetadata {
192            metadata: Some(Serialized(SerializedMetadata {
193                metadata: serde_json::to_vec(&json_value)
194                    .context("Can't serialize iceberg sink metadata")?,
195            })),
196        })
197    }
198}
199
200impl TryFrom<IcebergCommitResult> for Vec<u8> {
201    type Error = SinkError;
202
203    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
204        let json_data_files = serde_json::Value::Array(
205            value
206                .data_files
207                .iter()
208                .map(serde_json::to_value)
209                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
210                .context("Can't serialize data files to json")?,
211        );
212        let json_value = serde_json::Value::Object(
213            vec![
214                (
215                    SCHEMA_ID.to_owned(),
216                    serde_json::Value::Number(value.schema_id.into()),
217                ),
218                (
219                    PARTITION_SPEC_ID.to_owned(),
220                    serde_json::Value::Number(value.partition_spec_id.into()),
221                ),
222                (DATA_FILES.to_owned(), json_data_files),
223            ]
224            .into_iter()
225            .collect(),
226        );
227        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
228    }
229}
230
231fn arrow_data_type_compatible(current: &ArrowDataType, expected: &ArrowDataType) -> bool {
232    use ArrowDataType::*;
233
234    match (current, expected) {
235        // RW Decimal has no precision/scale. Sink creation already accepts any
236        // table Decimal128 precision/scale, while expected schemas here are
237        // generated from RW columns using the same canonical mapping. Ignoring
238        // both values prevents false schema-state ambiguity and cannot mask an
239        // auto schema change, which only supports add/drop columns.
240        (Decimal128(_, _), Decimal128(_, _)) => true,
241        (Binary, LargeBinary) | (LargeBinary, Binary) => true,
242        (List(current_field), List(expected_field)) => {
243            arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
244        }
245        (Map(current_field, current_sorted), Map(expected_field, expected_sorted)) => {
246            current_sorted == expected_sorted
247                && arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
248        }
249        (Struct(current_fields), Struct(expected_fields)) => {
250            let expected_fields = expected_fields
251                .iter()
252                .map(|field| field.as_ref().clone())
253                .collect_vec();
254            schema_contains_same_fields(current_fields, &expected_fields)
255        }
256        _ => current == expected,
257    }
258}
259
260fn schema_contains_same_fields(current: &ArrowFields, expected: &[ArrowField]) -> bool {
261    if current.len() != expected.len() {
262        return false;
263    }
264
265    let mut unmatched_current = current.iter().collect_vec();
266    expected.iter().all(|expected_field| {
267        let Some(pos) = unmatched_current.iter().position(|current_field| {
268            current_field.name() == expected_field.name()
269                && arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
270        }) else {
271            return false;
272        };
273        unmatched_current.swap_remove(pos);
274        true
275    })
276}
277
278pub struct IcebergSinkCommitter {
279    pub(super) catalog: Arc<dyn Catalog>,
280    pub(super) table: Table,
281    pub last_commit_epoch: u64,
282    pub(crate) sink_id: SinkId,
283    pub(crate) config: IcebergConfig,
284    pub(crate) param: SinkParam,
285    pub(super) commit_retry_num: u32,
286    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
287}
288
289impl IcebergSinkCommitter {
290    fn latest_observed_snapshot(&self) -> Option<IcebergCommittedSnapshot> {
291        let branch = commit_branch(self.config.r#type.as_str(), self.config.write_mode);
292        self.table
293            .metadata()
294            .snapshot_for_ref(&branch)
295            .map(|snapshot| IcebergCommittedSnapshot {
296                branch,
297                snapshot_id: snapshot.snapshot_id(),
298                timestamp_ms: snapshot.timestamp_ms(),
299            })
300    }
301
302    fn notify_iceberg_compaction_scheduler(&self, force_compaction: bool) {
303        let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender else {
304            return;
305        };
306
307        let Some(observed_snapshot) = self.latest_observed_snapshot() else {
308            warn!(
309                sink_id = %self.sink_id,
310                "skip iceberg compaction update because no observed snapshot is available"
311            );
312            return;
313        };
314
315        let observed_snapshot_id = observed_snapshot.snapshot_id;
316        let observed_snapshot_timestamp_ms = observed_snapshot.timestamp_ms;
317        let observed_snapshot_branch = observed_snapshot.branch.clone();
318
319        if iceberg_compact_stat_sender
320            .send(IcebergSinkCompactionUpdate {
321                sink_id: self.sink_id,
322                force_compaction,
323                observed_snapshot,
324            })
325            .is_err()
326        {
327            warn!(
328                sink_id = %self.sink_id,
329                force_compaction,
330                observed_snapshot_id,
331                observed_snapshot_timestamp_ms,
332                observed_snapshot_branch = %observed_snapshot_branch,
333                "failed to send iceberg compaction update"
334            );
335        }
336    }
337
338    // Reload table and guarantee current schema_id and partition_spec_id matches
339    // given `schema_id` and `partition_spec_id`
340    async fn reload_table(
341        catalog: &dyn Catalog,
342        table_ident: &TableIdent,
343        schema_id: i32,
344        partition_spec_id: i32,
345    ) -> Result<Table> {
346        let table = catalog
347            .load_table(table_ident)
348            .await
349            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
350        if table.metadata().current_schema_id() != schema_id {
351            return Err(SinkError::Iceberg(anyhow!(
352                "Schema evolution not supported, expect schema id {}, but got {}",
353                schema_id,
354                table.metadata().current_schema_id()
355            )));
356        }
357        if table.metadata().default_partition_spec_id() != partition_spec_id {
358            return Err(SinkError::Iceberg(anyhow!(
359                "Partition evolution not supported, expect partition spec id {}, but got {}",
360                partition_spec_id,
361                table.metadata().default_partition_spec_id()
362            )));
363        }
364        Ok(table)
365    }
366}
367
368#[async_trait]
369impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
370    async fn init(&mut self) -> Result<()> {
371        tracing::info!(
372            sink_id = %self.param.sink_id,
373            "Iceberg sink coordinator initialized",
374        );
375
376        Ok(())
377    }
378
379    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
380        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
381
382        if metadata.is_empty() {
383            tracing::debug!(?epoch, "No datafile to commit");
384            return Ok(());
385        }
386
387        // Commit data if present
388        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
389            self.commit_data_impl(epoch, write_results, snapshot_id)
390                .await?;
391        }
392
393        Ok(())
394    }
395
396    async fn commit_schema_change(
397        &mut self,
398        epoch: u64,
399        schema_change: PbSinkSchemaChange,
400    ) -> Result<()> {
401        tracing::info!(
402            "Committing schema change {:?} in epoch {}",
403            schema_change,
404            epoch
405        );
406        self.commit_schema_change_impl(schema_change).await?;
407        tracing::info!("Successfully committed schema change in epoch {}", epoch);
408
409        Ok(())
410    }
411}
412
413#[async_trait]
414impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
415    async fn init(&mut self) -> Result<()> {
416        tracing::info!(
417            sink_id = %self.param.sink_id,
418            "Iceberg sink coordinator initialized",
419        );
420
421        Ok(())
422    }
423
424    async fn pre_commit(
425        &mut self,
426        epoch: u64,
427        metadata: Vec<SinkMetadata>,
428        _schema_change: Option<PbSinkSchemaChange>,
429    ) -> Result<Option<Vec<u8>>> {
430        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
431
432        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
433            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
434            None => {
435                tracing::debug!(?epoch, "no data to pre commit");
436                return Ok(None);
437            }
438        };
439
440        let mut write_results_bytes = Vec::new();
441        for each_parallelism_write_result in write_results {
442            let each_parallelism_write_result_bytes: Vec<u8> =
443                each_parallelism_write_result.try_into()?;
444            write_results_bytes.push(each_parallelism_write_result_bytes);
445        }
446
447        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
448        write_results_bytes.push(snapshot_id_bytes);
449
450        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
451        Ok(Some(pre_commit_metadata_bytes))
452    }
453
454    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
455        tracing::debug!("Starting iceberg commit in epoch {epoch}");
456
457        if commit_metadata.is_empty() {
458            tracing::debug!(?epoch, "No datafile to commit");
459            return Ok(());
460        }
461
462        // Deserialize commit metadata
463        let mut payload = deserialize_metadata(commit_metadata);
464        if payload.is_empty() {
465            return Err(SinkError::Iceberg(anyhow!(
466                "Invalid commit metadata: empty payload"
467            )));
468        }
469
470        // Last element is snapshot_id
471        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
472            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
473        })?;
474        let snapshot_id = i64::from_le_bytes(
475            snapshot_id_bytes
476                .try_into()
477                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
478        );
479
480        // Remaining elements are write_results
481        let write_results = payload
482            .into_iter()
483            .map(IcebergCommitResult::try_from_serialized_bytes)
484            .collect::<Result<Vec<_>>>()?;
485
486        let snapshot_committed = self
487            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
488            .await?;
489
490        if snapshot_committed {
491            tracing::info!(
492                "Snapshot id {} already committed in iceberg table, skip committing again.",
493                snapshot_id
494            );
495            return Ok(());
496        }
497
498        self.commit_data_impl(epoch, write_results, snapshot_id)
499            .await
500    }
501
502    async fn commit_schema_change(
503        &mut self,
504        epoch: u64,
505        schema_change: PbSinkSchemaChange,
506    ) -> Result<()> {
507        let schema_updated = self.check_schema_change_applied(&schema_change)?;
508        if schema_updated {
509            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
510            return Ok(());
511        }
512
513        tracing::info!(
514            "Committing schema change {:?} in epoch {}",
515            schema_change,
516            epoch
517        );
518        self.commit_schema_change_impl(schema_change).await?;
519        tracing::info!("Successfully committed schema change in epoch {epoch}");
520
521        Ok(())
522    }
523
524    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
525        // TODO: Files that have been written but not committed should be deleted.
526        tracing::debug!("Abort not implemented yet");
527    }
528}
529
530/// Methods Required to Achieve Exactly Once Semantics
531impl IcebergSinkCommitter {
532    fn pre_commit_inner(
533        &mut self,
534        _epoch: u64,
535        metadata: Vec<SinkMetadata>,
536    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
537        let write_results: Vec<IcebergCommitResult> = metadata
538            .iter()
539            .map(IcebergCommitResult::try_from)
540            .collect::<Result<Vec<IcebergCommitResult>>>()?;
541
542        // Skip if no data to commit
543        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
544            return Ok(None);
545        }
546
547        let expect_schema_id = write_results[0].schema_id;
548        let expect_partition_spec_id = write_results[0].partition_spec_id;
549
550        // guarantee that all write results has same schema_id and partition_spec_id
551        if write_results
552            .iter()
553            .any(|r| r.schema_id != expect_schema_id)
554            || write_results
555                .iter()
556                .any(|r| r.partition_spec_id != expect_partition_spec_id)
557        {
558            return Err(SinkError::Iceberg(anyhow!(
559                "schema_id and partition_spec_id should be the same in all write results"
560            )));
561        }
562
563        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
564
565        Ok(Some((write_results, snapshot_id)))
566    }
567
568    async fn commit_data_impl(
569        &mut self,
570        epoch: u64,
571        write_results: Vec<IcebergCommitResult>,
572        snapshot_id: i64,
573    ) -> Result<()> {
574        // Empty write results should be handled before calling this function.
575        assert!(
576            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
577        );
578
579        // Check snapshot limit before proceeding with commit
580        self.wait_for_snapshot_limit().await?;
581
582        let expect_schema_id = write_results[0].schema_id;
583        let expect_partition_spec_id = write_results[0].partition_spec_id;
584
585        // Load the latest table to avoid concurrent modification with the best effort.
586        self.table = Self::reload_table(
587            self.catalog.as_ref(),
588            self.table.identifier(),
589            expect_schema_id,
590            expect_partition_spec_id,
591        )
592        .await?;
593
594        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
595            return Err(SinkError::Iceberg(anyhow!(
596                "Can't find schema by id {}",
597                expect_schema_id
598            )));
599        };
600        let Some(partition_spec) = self
601            .table
602            .metadata()
603            .partition_spec_by_id(expect_partition_spec_id)
604        else {
605            return Err(SinkError::Iceberg(anyhow!(
606                "Can't find partition spec by id {}",
607                expect_partition_spec_id
608            )));
609        };
610        let partition_type = partition_spec
611            .as_ref()
612            .clone()
613            .partition_type(schema)
614            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
615
616        let data_files = write_results
617            .into_iter()
618            .flat_map(|r| {
619                r.data_files.into_iter().map(|f| {
620                    f.try_into(expect_partition_spec_id, &partition_type, schema)
621                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
622                })
623            })
624            .collect::<Result<Vec<DataFile>>>()?;
625        // # TODO:
626        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
627        // because retry logic involved reapply the commit metadata.
628        // For now, we just retry the commit operation.
629        let retry_strategy = ExponentialBackoff::from_millis(10)
630            .max_delay(Duration::from_secs(60))
631            .map(jitter)
632            .take(self.commit_retry_num as usize);
633        let catalog = self.catalog.clone();
634        let table_ident = self.table.identifier().clone();
635
636        // Custom retry logic that:
637        // 1. Calls reload_table before each commit attempt to get the latest metadata
638        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
639        // 3. If commit fails, retries with backoff
640        enum CommitError {
641            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
642            Commit(SinkError),      // Retriable: commit conflicts, network errors
643        }
644
645        let table = RetryIf::spawn(
646            retry_strategy,
647            || async {
648                // Reload table before each commit attempt to get the latest metadata
649                let table = Self::reload_table(
650                    catalog.as_ref(),
651                    &table_ident,
652                    expect_schema_id,
653                    expect_partition_spec_id,
654                )
655                .await
656                .map_err(|e| {
657                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
658                    CommitError::ReloadTable(e)
659                })?;
660
661                let txn = Transaction::new(&table);
662                let append_action = txn
663                    .fast_append()
664                    .set_snapshot_id(snapshot_id)
665                    .set_target_branch(commit_branch(
666                        self.config.r#type.as_str(),
667                        self.config.write_mode,
668                    ))
669                    .add_data_files(data_files.clone());
670
671                let tx = append_action.apply(txn).map_err(|err| {
672                    let err: IcebergError = err.into();
673                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
674                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
675                })?;
676
677                tx.commit(catalog.as_ref()).await.map_err(|err| {
678                    let err: IcebergError = err.into();
679                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
680                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
681                })
682            },
683            |err: &CommitError| {
684                // Only retry on commit errors, not on reload_table errors
685                match err {
686                    CommitError::Commit(_) => {
687                        tracing::warn!("Commit failed, will retry");
688                        true
689                    }
690                    CommitError::ReloadTable(_) => {
691                        tracing::error!(
692                            "reload_table failed with non-retriable error, will not retry"
693                        );
694                        false
695                    }
696                }
697            },
698        )
699        .await
700        .map_err(|e| match e {
701            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
702        })?;
703        self.table = table;
704
705        let snapshot_num = self.table.metadata().snapshots().count();
706        let catalog_name = self.config.common.catalog_name();
707        let table_name = self.table.identifier().to_string();
708        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
709        GLOBAL_SINK_METRICS
710            .iceberg_snapshot_num
711            .with_guarded_label_values(&metrics_labels)
712            .set(snapshot_num as i64);
713
714        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
715
716        self.notify_iceberg_compaction_scheduler(false);
717
718        Ok(())
719    }
720
721    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
722    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
723    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
724    async fn is_snapshot_id_in_iceberg(
725        &self,
726        iceberg_config: &IcebergConfig,
727        snapshot_id: i64,
728    ) -> Result<bool> {
729        let table = iceberg_config.load_table().await?;
730        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
731            Ok(true)
732        } else {
733            Ok(false)
734        }
735    }
736
737    /// Check if the specified columns already exist in the iceberg table's current schema.
738    /// This is used to determine if schema change has already been applied.
739    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
740        let current_schema = self.table.metadata().current_schema();
741        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
742            .context("Failed to convert schema")
743            .map_err(SinkError::Iceberg)?;
744
745        let iceberg_arrow_convert = IcebergArrowConvert;
746
747        let schema_matches = |expected: &[ArrowField]| {
748            schema_contains_same_fields(current_arrow_schema.fields(), expected)
749        };
750
751        let original_arrow_fields: Vec<ArrowField> = schema_change
752            .original_schema
753            .iter()
754            .map(|pb_field| {
755                let field = Field::from(pb_field);
756                iceberg_arrow_convert
757                    .to_arrow_field(&field.name, &field.data_type)
758                    .context("Failed to convert field to arrow")
759                    .map_err(SinkError::Iceberg)
760            })
761            .collect::<Result<_>>()?;
762
763        // If current schema equals original_schema, then schema change is NOT applied.
764        if schema_matches(&original_arrow_fields) {
765            tracing::debug!(
766                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
767                original_arrow_fields.len()
768            );
769            return Ok(false);
770        }
771
772        let expected_after_change = match schema_change.op.as_ref() {
773            Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(
774                add_columns_op,
775            )) => {
776                let add_arrow_fields: Vec<ArrowField> = add_columns_op
777                    .fields
778                    .iter()
779                    .map(|pb_field| {
780                        let field = Field::from(pb_field);
781                        iceberg_arrow_convert
782                            .to_arrow_field(&field.name, &field.data_type)
783                            .context("Failed to convert field to arrow")
784                            .map_err(SinkError::Iceberg)
785                    })
786                    .collect::<Result<_>>()?;
787
788                let mut expected_after_change = original_arrow_fields;
789                expected_after_change.extend(add_arrow_fields);
790                expected_after_change
791            }
792            Some(risingwave_pb::stream_plan::sink_schema_change::Op::DropColumns(
793                drop_columns_op,
794            )) => original_arrow_fields
795                .into_iter()
796                .filter(|field| {
797                    !drop_columns_op
798                        .column_names
799                        .iter()
800                        .any(|name| name == field.name())
801                })
802                .collect_vec(),
803            _ => {
804                return Err(SinkError::Iceberg(anyhow!(
805                    "Unsupported sink schema change op in iceberg sink: {:?}",
806                    schema_change.op
807                )));
808            }
809        };
810
811        // If current schema equals the changed schema, then schema change is applied.
812        if schema_matches(&expected_after_change) {
813            tracing::debug!(
814                "Current iceberg schema matches changed schema ({} columns); schema change already applied",
815                expected_after_change.len()
816            );
817            return Ok(true);
818        }
819
820        Err(SinkError::Iceberg(anyhow!(
821            "Current iceberg schema does not match either original_schema ({} cols) or changed schema; cannot determine whether schema change is applied",
822            schema_change.original_schema.len()
823        )))
824    }
825
826    /// Commit schema changes (e.g., add columns) to the iceberg table.
827    /// This function uses Transaction API to atomically update the table schema
828    /// with optimistic locking to prevent concurrent conflicts.
829    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
830        use iceberg::spec::NestedField;
831
832        // Step 1: Get current table metadata
833        let metadata = self.table.metadata();
834        let mut next_field_id = metadata.last_column_id() + 1;
835        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
836
837        // Step 2: Build new fields to add
838        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
839        let mut new_fields = Vec::new();
840
841        let mut drop_column_names = Vec::new();
842        match schema_change.op.as_ref() {
843            Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(
844                add_columns_op,
845            )) => {
846                let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
847                for field in &add_columns {
848                    // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
849                    let arrow_field = iceberg_create_table_arrow_convert
850                        .to_arrow_field(&field.name, &field.data_type)
851                        .with_context(|| {
852                            format!("Failed to convert field '{}' to arrow", field.name)
853                        })
854                        .map_err(SinkError::Iceberg)?;
855
856                    // Convert Arrow DataType to Iceberg Type
857                    let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
858                        .map_err(|err| {
859                            SinkError::Iceberg(
860                                anyhow!(err)
861                                    .context("Failed to convert Arrow type to Iceberg type"),
862                            )
863                        })?;
864
865                    // Create NestedField with the next available field ID
866                    let nested_field = Arc::new(NestedField::optional(
867                        next_field_id,
868                        &field.name,
869                        iceberg_type,
870                    ));
871
872                    new_fields.push(nested_field);
873                    tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
874                    next_field_id += 1;
875                }
876            }
877            Some(risingwave_pb::stream_plan::sink_schema_change::Op::DropColumns(
878                drop_columns_op,
879            )) => {
880                drop_column_names = drop_columns_op.column_names.clone();
881            }
882            _ => {
883                return Err(SinkError::Iceberg(anyhow!(
884                    "Unsupported sink schema change op in iceberg sink: {:?}",
885                    schema_change.op
886                )));
887            }
888        }
889
890        // Step 3: Create Transaction with UpdateSchemaAction
891        tracing::info!(
892            "Committing schema change to catalog for table {}",
893            self.table.identifier()
894        );
895
896        let txn = Transaction::new(&self.table);
897        let action_fields_added = new_fields.len();
898        let action = txn
899            .update_schema()
900            .add_fields(new_fields)
901            .drop_fields(drop_column_names.clone());
902
903        let updated_table = action
904            .apply(txn)
905            .context("Failed to apply schema update action")
906            .map_err(SinkError::Iceberg)?
907            .commit(self.catalog.as_ref())
908            .await
909            .context("Failed to commit table schema change")
910            .map_err(SinkError::Iceberg)?;
911
912        self.table = updated_table;
913
914        tracing::info!(
915            "Successfully committed schema change, added {} columns and dropped {} columns from iceberg table",
916            action_fields_added,
917            drop_column_names.len()
918        );
919
920        Ok(())
921    }
922
923    /// Check the number of snapshots on the given branch lineage since the last rewrite operation.
924    /// Returns the number of snapshots since the last rewrite.
925    fn count_snapshots_since_rewrite_in_metadata(metadata: &TableMetadata, branch: &str) -> usize {
926        // Start from the latest snapshot of the commit branch.
927        let mut snapshot_id = metadata
928            .snapshot_for_ref(branch)
929            .map(|snapshot| snapshot.snapshot_id());
930        let mut count = 0;
931
932        // Iterate through snapshots by parent lineage to find the last rewrite.
933        while let Some(current_snapshot_id) = snapshot_id {
934            let Some(snapshot) = metadata.snapshot_by_id(current_snapshot_id) else {
935                break;
936            };
937
938            // Check if this snapshot represents a rewrite operation.
939            if snapshot.summary().operation == Operation::Replace {
940                // Found a rewrite operation, stop counting.
941                break;
942            }
943
944            // Increment count for each snapshot that is not a rewrite.
945            count += 1;
946            snapshot_id = snapshot.parent_snapshot_id();
947        }
948
949        count
950    }
951
952    /// Returns the number of snapshots in the current commit branch since the last rewrite.
953    fn count_snapshots_since_rewrite(&self) -> usize {
954        let branch = commit_branch(self.config.r#type.as_str(), self.config.write_mode);
955        Self::count_snapshots_since_rewrite_in_metadata(self.table.metadata(), branch.as_str())
956    }
957
958    /// Wait until snapshot count since last rewrite is below the limit
959    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
960        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
961            loop {
962                let current_count = self.count_snapshots_since_rewrite();
963
964                if current_count < max_snapshots {
965                    tracing::info!(
966                        "Snapshot count check passed: {} < {}",
967                        current_count,
968                        max_snapshots
969                    );
970                    break;
971                }
972
973                tracing::info!(
974                    "Snapshot count {} exceeds limit {}, waiting...",
975                    current_count,
976                    max_snapshots
977                );
978
979                self.notify_iceberg_compaction_scheduler(true);
980
981                // Wait for 30 seconds before checking again
982                tokio::time::sleep(Duration::from_secs(30)).await;
983
984                // Refresh table after the wait so the next check sees latest snapshots.
985                self.table = self.config.load_table().await?;
986            }
987        }
988        Ok(())
989    }
990}
991
992fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
993    serde_json::to_vec(&metadata).unwrap()
994}
995
996fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
997    serde_json::from_slice(&bytes).unwrap()
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use std::collections::HashMap;
1003
1004    use iceberg::spec::{
1005        FormatVersion, MAIN_BRANCH, NestedField, PrimitiveType, Schema, Snapshot,
1006        SnapshotReference, SnapshotRetention, SortOrder, Summary, TableMetadataBuilder, Type,
1007        UnboundPartitionSpec,
1008    };
1009    use risingwave_common::array::arrow::arrow_schema_iceberg::{
1010        DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef,
1011        Fields as ArrowFields, Schema as ArrowSchema,
1012    };
1013
1014    use super::*;
1015
1016    #[test]
1017    fn test_schema_contains_same_fields_allows_binary_large_binary() {
1018        let current_schema = ArrowSchema::new(vec![
1019            ArrowField::new("k", ArrowDataType::Int32, true),
1020            ArrowField::new("v", ArrowDataType::LargeBinary, true),
1021        ]);
1022        let expected_fields = vec![
1023            ArrowField::new("k", ArrowDataType::Int32, true),
1024            ArrowField::new("v", ArrowDataType::Binary, true),
1025        ];
1026
1027        assert!(schema_contains_same_fields(
1028            current_schema.fields(),
1029            &expected_fields
1030        ));
1031    }
1032
1033    #[test]
1034    fn test_schema_contains_same_fields_allows_nested_binary_large_binary() {
1035        let current_schema = ArrowSchema::new(vec![
1036            ArrowField::new(
1037                "s",
1038                ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1039                    "payload",
1040                    ArrowDataType::LargeBinary,
1041                    true,
1042                )])),
1043                true,
1044            ),
1045            ArrowField::new(
1046                "l",
1047                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
1048                    ArrowDataType::LargeBinary,
1049                    true,
1050                ))),
1051                true,
1052            ),
1053            ArrowField::new_map(
1054                "m",
1055                "entries",
1056                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
1057                ArrowFieldRef::new(ArrowField::new("value", ArrowDataType::LargeBinary, true)),
1058                false,
1059                true,
1060            ),
1061        ]);
1062        let expected_fields = vec![
1063            ArrowField::new(
1064                "s",
1065                ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1066                    "payload",
1067                    ArrowDataType::Binary,
1068                    true,
1069                )])),
1070                true,
1071            ),
1072            ArrowField::new(
1073                "l",
1074                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
1075                    ArrowDataType::Binary,
1076                    true,
1077                ))),
1078                true,
1079            ),
1080            ArrowField::new_map(
1081                "m",
1082                "entries",
1083                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
1084                ArrowFieldRef::new(ArrowField::new("value", ArrowDataType::Binary, true)),
1085                false,
1086                true,
1087            ),
1088        ];
1089
1090        assert!(schema_contains_same_fields(
1091            current_schema.fields(),
1092            &expected_fields
1093        ));
1094    }
1095
1096    #[test]
1097    fn test_schema_contains_same_fields_allows_decimal_precision_delta() {
1098        let current_schema = ArrowSchema::new(vec![ArrowField::new(
1099            "d",
1100            ArrowDataType::Decimal128(28, 10),
1101            true,
1102        )]);
1103        let expected_fields = vec![ArrowField::new(
1104            "d",
1105            ArrowDataType::Decimal128(38, 10),
1106            true,
1107        )];
1108
1109        assert!(schema_contains_same_fields(
1110            current_schema.fields(),
1111            &expected_fields
1112        ));
1113    }
1114
1115    #[test]
1116    fn test_schema_contains_same_fields_allows_decimal_precision_and_scale_delta() {
1117        let current_schema = ArrowSchema::new(vec![ArrowField::new(
1118            "d",
1119            ArrowDataType::Decimal128(38, 2),
1120            true,
1121        )]);
1122        let expected_fields = vec![ArrowField::new(
1123            "d",
1124            ArrowDataType::Decimal128(38, 10),
1125            true,
1126        )];
1127
1128        assert!(schema_contains_same_fields(
1129            current_schema.fields(),
1130            &expected_fields
1131        ));
1132    }
1133
1134    #[test]
1135    fn test_schema_contains_same_fields_rejects_length_mismatch() {
1136        let current_schema =
1137            ArrowSchema::new(vec![ArrowField::new("k", ArrowDataType::Int32, true)]);
1138        let expected_fields = vec![
1139            ArrowField::new("k", ArrowDataType::Int32, true),
1140            ArrowField::new("v", ArrowDataType::Utf8, true),
1141        ];
1142
1143        assert!(!schema_contains_same_fields(
1144            current_schema.fields(),
1145            &expected_fields
1146        ));
1147    }
1148
1149    #[test]
1150    fn test_schema_contains_same_fields_rejects_type_mismatch() {
1151        let current_schema =
1152            ArrowSchema::new(vec![ArrowField::new("v", ArrowDataType::Utf8, true)]);
1153        let expected_fields = vec![ArrowField::new("v", ArrowDataType::Int32, true)];
1154
1155        assert!(!schema_contains_same_fields(
1156            current_schema.fields(),
1157            &expected_fields
1158        ));
1159    }
1160
1161    #[test]
1162    fn test_schema_contains_same_fields_rejects_reused_duplicate_match() {
1163        let current_schema = ArrowSchema::new(vec![
1164            ArrowField::new("v", ArrowDataType::Int32, true),
1165            ArrowField::new("v", ArrowDataType::Utf8, true),
1166        ]);
1167        let expected_fields = vec![
1168            ArrowField::new("v", ArrowDataType::Int32, true),
1169            ArrowField::new("v", ArrowDataType::Int32, true),
1170        ];
1171
1172        assert!(!schema_contains_same_fields(
1173            current_schema.fields(),
1174            &expected_fields
1175        ));
1176    }
1177
1178    #[test]
1179    fn test_schema_contains_same_fields_rejects_nested_type_mismatch() {
1180        let current_schema = ArrowSchema::new(vec![ArrowField::new(
1181            "s",
1182            ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1183                "payload",
1184                ArrowDataType::Utf8,
1185                true,
1186            )])),
1187            true,
1188        )]);
1189        let expected_fields = vec![ArrowField::new(
1190            "s",
1191            ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1192                "payload",
1193                ArrowDataType::Int32,
1194                true,
1195            )])),
1196            true,
1197        )];
1198
1199        assert!(!schema_contains_same_fields(
1200            current_schema.fields(),
1201            &expected_fields
1202        ));
1203    }
1204
1205    #[test]
1206    fn test_count_snapshots_since_rewrite_in_metadata_ignores_other_branches() {
1207        let mut builder = TableMetadataBuilder::new(
1208            Schema::builder()
1209                .with_fields(vec![
1210                    NestedField::new(1, "id", Type::Primitive(PrimitiveType::Long), false).into(),
1211                ])
1212                .build()
1213                .unwrap(),
1214            UnboundPartitionSpec::builder().build(),
1215            SortOrder::unsorted_order(),
1216            "s3://warehouse/db/table".to_owned(),
1217            FormatVersion::V2,
1218            HashMap::new(),
1219        )
1220        .unwrap();
1221
1222        for (snapshot_id, parent_snapshot_id, operation) in [
1223            (1, None, Operation::Append),
1224            (2, Some(1), Operation::Append),
1225            (3, Some(2), Operation::Replace),
1226            (4, Some(3), Operation::Append),
1227            // Simulate the COW publish snapshot on main. It should not affect
1228            // the ingestion branch backlog count.
1229            (5, None, Operation::Overwrite),
1230        ] {
1231            builder = builder
1232                .add_snapshot(snapshot(snapshot_id, parent_snapshot_id, operation))
1233                .unwrap();
1234        }
1235
1236        let metadata = builder
1237            .set_ref(super::super::ICEBERG_COW_BRANCH, snapshot_ref(4))
1238            .unwrap()
1239            .set_ref(MAIN_BRANCH, snapshot_ref(5))
1240            .unwrap()
1241            .build()
1242            .unwrap()
1243            .metadata;
1244
1245        let count = IcebergSinkCommitter::count_snapshots_since_rewrite_in_metadata(
1246            &metadata,
1247            super::super::ICEBERG_COW_BRANCH,
1248        );
1249
1250        assert_eq!(count, 1);
1251    }
1252
1253    fn snapshot(
1254        snapshot_id: i64,
1255        parent_snapshot_id: Option<i64>,
1256        operation: Operation,
1257    ) -> Snapshot {
1258        Snapshot::builder()
1259            .with_snapshot_id(snapshot_id)
1260            .with_parent_snapshot_id(parent_snapshot_id)
1261            .with_sequence_number(snapshot_id)
1262            .with_timestamp_ms(snapshot_id)
1263            .with_manifest_list(format!("/snap-{snapshot_id}.avro"))
1264            .with_summary(Summary {
1265                operation,
1266                additional_properties: HashMap::new(),
1267            })
1268            .with_schema_id(0)
1269            .build()
1270    }
1271
1272    fn snapshot_ref(snapshot_id: i64) -> SnapshotReference {
1273        SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None))
1274    }
1275}