risingwave_connector/sink/iceberg/
commit.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{DataFile, Operation, SerializedDataFile};
22use iceberg::table::Table;
23use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
24use iceberg::{Catalog, TableIdent};
25use itertools::Itertools;
26use risingwave_common::array::arrow::arrow_schema_iceberg::Field as ArrowField;
27use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
28use risingwave_common::bail;
29use risingwave_common::catalog::Field;
30use risingwave_common::error::IcebergError;
31use risingwave_pb::connector_service::SinkMetadata;
32use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
33use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
34use risingwave_pb::stream_plan::PbSinkSchemaChange;
35use serde_json::from_value;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedSender;
38use tokio_retry::RetryIf;
39use tokio_retry::strategy::{ExponentialBackoff, jitter};
40use tracing::warn;
41
42use super::{GLOBAL_SINK_METRICS, IcebergConfig, SinkError, commit_branch};
43use crate::connector_common::IcebergSinkCompactionUpdate;
44use crate::sink::catalog::SinkId;
45use crate::sink::{Result, SinglePhaseCommitCoordinator, SinkParam, TwoPhaseCommitCoordinator};
46
47const SCHEMA_ID: &str = "schema_id";
48const PARTITION_SPEC_ID: &str = "partition_spec_id";
49const DATA_FILES: &str = "data_files";
50
51#[derive(Default, Clone)]
52pub struct IcebergCommitResult {
53    pub schema_id: i32,
54    pub partition_spec_id: i32,
55    pub data_files: Vec<SerializedDataFile>,
56}
57
58impl IcebergCommitResult {
59    fn try_from(value: &SinkMetadata) -> Result<Self> {
60        if let Some(Serialized(v)) = &value.metadata {
61            let mut values = if let serde_json::Value::Object(v) =
62                serde_json::from_slice::<serde_json::Value>(&v.metadata)
63                    .context("Can't parse iceberg sink metadata")?
64            {
65                v
66            } else {
67                bail!("iceberg sink metadata should be an object");
68            };
69
70            let schema_id;
71            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
72                schema_id = value
73                    .as_u64()
74                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
75            } else {
76                bail!("iceberg sink metadata should have schema_id");
77            }
78
79            let partition_spec_id;
80            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
81                partition_spec_id = value
82                    .as_u64()
83                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
84            } else {
85                bail!("iceberg sink metadata should have partition_spec_id");
86            }
87
88            let data_files: Vec<SerializedDataFile>;
89            if let serde_json::Value::Array(values) = values
90                .remove(DATA_FILES)
91                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
92            {
93                data_files = values
94                    .into_iter()
95                    .map(from_value::<SerializedDataFile>)
96                    .collect::<std::result::Result<_, _>>()
97                    .unwrap();
98            } else {
99                bail!("iceberg sink metadata should have data_files object");
100            }
101
102            Ok(Self {
103                schema_id: schema_id as i32,
104                partition_spec_id: partition_spec_id as i32,
105                data_files,
106            })
107        } else {
108            bail!("Can't create iceberg sink write result from empty data!")
109        }
110    }
111
112    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
113        let mut values = if let serde_json::Value::Object(value) =
114            serde_json::from_slice::<serde_json::Value>(&value)
115                .context("Can't parse iceberg sink metadata")?
116        {
117            value
118        } else {
119            bail!("iceberg sink metadata should be an object");
120        };
121
122        let schema_id;
123        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
124            schema_id = value
125                .as_u64()
126                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
127        } else {
128            bail!("iceberg sink metadata should have schema_id");
129        }
130
131        let partition_spec_id;
132        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
133            partition_spec_id = value
134                .as_u64()
135                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
136        } else {
137            bail!("iceberg sink metadata should have partition_spec_id");
138        }
139
140        let data_files: Vec<SerializedDataFile>;
141        if let serde_json::Value::Array(values) = values
142            .remove(DATA_FILES)
143            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
144        {
145            data_files = values
146                .into_iter()
147                .map(from_value::<SerializedDataFile>)
148                .collect::<std::result::Result<_, _>>()
149                .unwrap();
150        } else {
151            bail!("iceberg sink metadata should have data_files object");
152        }
153
154        Ok(Self {
155            schema_id: schema_id as i32,
156            partition_spec_id: partition_spec_id as i32,
157            data_files,
158        })
159    }
160}
161
162impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
163    type Error = SinkError;
164
165    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
166        let json_data_files = serde_json::Value::Array(
167            value
168                .data_files
169                .iter()
170                .map(serde_json::to_value)
171                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
172                .context("Can't serialize data files to json")?,
173        );
174        let json_value = serde_json::Value::Object(
175            vec![
176                (
177                    SCHEMA_ID.to_owned(),
178                    serde_json::Value::Number(value.schema_id.into()),
179                ),
180                (
181                    PARTITION_SPEC_ID.to_owned(),
182                    serde_json::Value::Number(value.partition_spec_id.into()),
183                ),
184                (DATA_FILES.to_owned(), json_data_files),
185            ]
186            .into_iter()
187            .collect(),
188        );
189        Ok(SinkMetadata {
190            metadata: Some(Serialized(SerializedMetadata {
191                metadata: serde_json::to_vec(&json_value)
192                    .context("Can't serialize iceberg sink metadata")?,
193            })),
194        })
195    }
196}
197
198impl TryFrom<IcebergCommitResult> for Vec<u8> {
199    type Error = SinkError;
200
201    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
202        let json_data_files = serde_json::Value::Array(
203            value
204                .data_files
205                .iter()
206                .map(serde_json::to_value)
207                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
208                .context("Can't serialize data files to json")?,
209        );
210        let json_value = serde_json::Value::Object(
211            vec![
212                (
213                    SCHEMA_ID.to_owned(),
214                    serde_json::Value::Number(value.schema_id.into()),
215                ),
216                (
217                    PARTITION_SPEC_ID.to_owned(),
218                    serde_json::Value::Number(value.partition_spec_id.into()),
219                ),
220                (DATA_FILES.to_owned(), json_data_files),
221            ]
222            .into_iter()
223            .collect(),
224        );
225        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
226    }
227}
228pub struct IcebergSinkCommitter {
229    pub(super) catalog: Arc<dyn Catalog>,
230    pub(super) table: Table,
231    pub last_commit_epoch: u64,
232    pub(crate) sink_id: SinkId,
233    pub(crate) config: IcebergConfig,
234    pub(crate) param: SinkParam,
235    pub(super) commit_retry_num: u32,
236    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
237}
238
239impl IcebergSinkCommitter {
240    // Reload table and guarantee current schema_id and partition_spec_id matches
241    // given `schema_id` and `partition_spec_id`
242    async fn reload_table(
243        catalog: &dyn Catalog,
244        table_ident: &TableIdent,
245        schema_id: i32,
246        partition_spec_id: i32,
247    ) -> Result<Table> {
248        let table = catalog
249            .load_table(table_ident)
250            .await
251            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
252        if table.metadata().current_schema_id() != schema_id {
253            return Err(SinkError::Iceberg(anyhow!(
254                "Schema evolution not supported, expect schema id {}, but got {}",
255                schema_id,
256                table.metadata().current_schema_id()
257            )));
258        }
259        if table.metadata().default_partition_spec_id() != partition_spec_id {
260            return Err(SinkError::Iceberg(anyhow!(
261                "Partition evolution not supported, expect partition spec id {}, but got {}",
262                partition_spec_id,
263                table.metadata().default_partition_spec_id()
264            )));
265        }
266        Ok(table)
267    }
268}
269
270#[async_trait]
271impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
272    async fn init(&mut self) -> Result<()> {
273        tracing::info!(
274            sink_id = %self.param.sink_id,
275            "Iceberg sink coordinator initialized",
276        );
277
278        Ok(())
279    }
280
281    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
282        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
283
284        if metadata.is_empty() {
285            tracing::debug!(?epoch, "No datafile to commit");
286            return Ok(());
287        }
288
289        // Commit data if present
290        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
291            self.commit_data_impl(epoch, write_results, snapshot_id)
292                .await?;
293        }
294
295        Ok(())
296    }
297
298    async fn commit_schema_change(
299        &mut self,
300        epoch: u64,
301        schema_change: PbSinkSchemaChange,
302    ) -> Result<()> {
303        tracing::info!(
304            "Committing schema change {:?} in epoch {}",
305            schema_change,
306            epoch
307        );
308        self.commit_schema_change_impl(schema_change).await?;
309        tracing::info!("Successfully committed schema change in epoch {}", epoch);
310
311        Ok(())
312    }
313}
314
315#[async_trait]
316impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
317    async fn init(&mut self) -> Result<()> {
318        tracing::info!(
319            sink_id = %self.param.sink_id,
320            "Iceberg sink coordinator initialized",
321        );
322
323        Ok(())
324    }
325
326    async fn pre_commit(
327        &mut self,
328        epoch: u64,
329        metadata: Vec<SinkMetadata>,
330        _schema_change: Option<PbSinkSchemaChange>,
331    ) -> Result<Option<Vec<u8>>> {
332        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
333
334        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
335            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
336            None => {
337                tracing::debug!(?epoch, "no data to pre commit");
338                return Ok(None);
339            }
340        };
341
342        let mut write_results_bytes = Vec::new();
343        for each_parallelism_write_result in write_results {
344            let each_parallelism_write_result_bytes: Vec<u8> =
345                each_parallelism_write_result.try_into()?;
346            write_results_bytes.push(each_parallelism_write_result_bytes);
347        }
348
349        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
350        write_results_bytes.push(snapshot_id_bytes);
351
352        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
353        Ok(Some(pre_commit_metadata_bytes))
354    }
355
356    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
357        tracing::debug!("Starting iceberg commit in epoch {epoch}");
358
359        if commit_metadata.is_empty() {
360            tracing::debug!(?epoch, "No datafile to commit");
361            return Ok(());
362        }
363
364        // Deserialize commit metadata
365        let mut payload = deserialize_metadata(commit_metadata);
366        if payload.is_empty() {
367            return Err(SinkError::Iceberg(anyhow!(
368                "Invalid commit metadata: empty payload"
369            )));
370        }
371
372        // Last element is snapshot_id
373        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
374            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
375        })?;
376        let snapshot_id = i64::from_le_bytes(
377            snapshot_id_bytes
378                .try_into()
379                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
380        );
381
382        // Remaining elements are write_results
383        let write_results = payload
384            .into_iter()
385            .map(IcebergCommitResult::try_from_serialized_bytes)
386            .collect::<Result<Vec<_>>>()?;
387
388        let snapshot_committed = self
389            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
390            .await?;
391
392        if snapshot_committed {
393            tracing::info!(
394                "Snapshot id {} already committed in iceberg table, skip committing again.",
395                snapshot_id
396            );
397            return Ok(());
398        }
399
400        self.commit_data_impl(epoch, write_results, snapshot_id)
401            .await
402    }
403
404    async fn commit_schema_change(
405        &mut self,
406        epoch: u64,
407        schema_change: PbSinkSchemaChange,
408    ) -> Result<()> {
409        let schema_updated = self.check_schema_change_applied(&schema_change)?;
410        if schema_updated {
411            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
412            return Ok(());
413        }
414
415        tracing::info!(
416            "Committing schema change {:?} in epoch {}",
417            schema_change,
418            epoch
419        );
420        self.commit_schema_change_impl(schema_change).await?;
421        tracing::info!("Successfully committed schema change in epoch {epoch}");
422
423        Ok(())
424    }
425
426    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
427        // TODO: Files that have been written but not committed should be deleted.
428        tracing::debug!("Abort not implemented yet");
429    }
430}
431
432/// Methods Required to Achieve Exactly Once Semantics
433impl IcebergSinkCommitter {
434    fn pre_commit_inner(
435        &mut self,
436        _epoch: u64,
437        metadata: Vec<SinkMetadata>,
438    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
439        let write_results: Vec<IcebergCommitResult> = metadata
440            .iter()
441            .map(IcebergCommitResult::try_from)
442            .collect::<Result<Vec<IcebergCommitResult>>>()?;
443
444        // Skip if no data to commit
445        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
446            return Ok(None);
447        }
448
449        let expect_schema_id = write_results[0].schema_id;
450        let expect_partition_spec_id = write_results[0].partition_spec_id;
451
452        // guarantee that all write results has same schema_id and partition_spec_id
453        if write_results
454            .iter()
455            .any(|r| r.schema_id != expect_schema_id)
456            || write_results
457                .iter()
458                .any(|r| r.partition_spec_id != expect_partition_spec_id)
459        {
460            return Err(SinkError::Iceberg(anyhow!(
461                "schema_id and partition_spec_id should be the same in all write results"
462            )));
463        }
464
465        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
466
467        Ok(Some((write_results, snapshot_id)))
468    }
469
470    async fn commit_data_impl(
471        &mut self,
472        epoch: u64,
473        write_results: Vec<IcebergCommitResult>,
474        snapshot_id: i64,
475    ) -> Result<()> {
476        // Empty write results should be handled before calling this function.
477        assert!(
478            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
479        );
480
481        // Check snapshot limit before proceeding with commit
482        self.wait_for_snapshot_limit().await?;
483
484        let expect_schema_id = write_results[0].schema_id;
485        let expect_partition_spec_id = write_results[0].partition_spec_id;
486
487        // Load the latest table to avoid concurrent modification with the best effort.
488        self.table = Self::reload_table(
489            self.catalog.as_ref(),
490            self.table.identifier(),
491            expect_schema_id,
492            expect_partition_spec_id,
493        )
494        .await?;
495
496        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
497            return Err(SinkError::Iceberg(anyhow!(
498                "Can't find schema by id {}",
499                expect_schema_id
500            )));
501        };
502        let Some(partition_spec) = self
503            .table
504            .metadata()
505            .partition_spec_by_id(expect_partition_spec_id)
506        else {
507            return Err(SinkError::Iceberg(anyhow!(
508                "Can't find partition spec by id {}",
509                expect_partition_spec_id
510            )));
511        };
512        let partition_type = partition_spec
513            .as_ref()
514            .clone()
515            .partition_type(schema)
516            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
517
518        let data_files = write_results
519            .into_iter()
520            .flat_map(|r| {
521                r.data_files.into_iter().map(|f| {
522                    f.try_into(expect_partition_spec_id, &partition_type, schema)
523                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
524                })
525            })
526            .collect::<Result<Vec<DataFile>>>()?;
527        // # TODO:
528        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
529        // because retry logic involved reapply the commit metadata.
530        // For now, we just retry the commit operation.
531        let retry_strategy = ExponentialBackoff::from_millis(10)
532            .max_delay(Duration::from_secs(60))
533            .map(jitter)
534            .take(self.commit_retry_num as usize);
535        let catalog = self.catalog.clone();
536        let table_ident = self.table.identifier().clone();
537
538        // Custom retry logic that:
539        // 1. Calls reload_table before each commit attempt to get the latest metadata
540        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
541        // 3. If commit fails, retries with backoff
542        enum CommitError {
543            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
544            Commit(SinkError),      // Retriable: commit conflicts, network errors
545        }
546
547        let table = RetryIf::spawn(
548            retry_strategy,
549            || async {
550                // Reload table before each commit attempt to get the latest metadata
551                let table = Self::reload_table(
552                    catalog.as_ref(),
553                    &table_ident,
554                    expect_schema_id,
555                    expect_partition_spec_id,
556                )
557                .await
558                .map_err(|e| {
559                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
560                    CommitError::ReloadTable(e)
561                })?;
562
563                let txn = Transaction::new(&table);
564                let append_action = txn
565                    .fast_append()
566                    .set_snapshot_id(snapshot_id)
567                    .set_target_branch(commit_branch(
568                        self.config.r#type.as_str(),
569                        self.config.write_mode,
570                    ))
571                    .add_data_files(data_files.clone());
572
573                let tx = append_action.apply(txn).map_err(|err| {
574                    let err: IcebergError = err.into();
575                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
576                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
577                })?;
578
579                tx.commit(catalog.as_ref()).await.map_err(|err| {
580                    let err: IcebergError = err.into();
581                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
582                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
583                })
584            },
585            |err: &CommitError| {
586                // Only retry on commit errors, not on reload_table errors
587                match err {
588                    CommitError::Commit(_) => {
589                        tracing::warn!("Commit failed, will retry");
590                        true
591                    }
592                    CommitError::ReloadTable(_) => {
593                        tracing::error!(
594                            "reload_table failed with non-retriable error, will not retry"
595                        );
596                        false
597                    }
598                }
599            },
600        )
601        .await
602        .map_err(|e| match e {
603            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
604        })?;
605        self.table = table;
606
607        let snapshot_num = self.table.metadata().snapshots().count();
608        let catalog_name = self.config.common.catalog_name();
609        let table_name = self.table.identifier().to_string();
610        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
611        GLOBAL_SINK_METRICS
612            .iceberg_snapshot_num
613            .with_guarded_label_values(&metrics_labels)
614            .set(snapshot_num as i64);
615
616        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
617
618        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
619            && iceberg_compact_stat_sender
620                .send(IcebergSinkCompactionUpdate {
621                    sink_id: self.sink_id,
622                    force_compaction: false,
623                })
624                .is_err()
625        {
626            warn!("failed to send iceberg compaction stats");
627        }
628
629        Ok(())
630    }
631
632    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
633    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
634    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
635    async fn is_snapshot_id_in_iceberg(
636        &self,
637        iceberg_config: &IcebergConfig,
638        snapshot_id: i64,
639    ) -> Result<bool> {
640        let table = iceberg_config.load_table().await?;
641        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
642            Ok(true)
643        } else {
644            Ok(false)
645        }
646    }
647
648    /// Check if the specified columns already exist in the iceberg table's current schema.
649    /// This is used to determine if schema change has already been applied.
650    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
651        let current_schema = self.table.metadata().current_schema();
652        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
653            .context("Failed to convert schema")
654            .map_err(SinkError::Iceberg)?;
655
656        let iceberg_arrow_convert = IcebergArrowConvert;
657
658        let schema_matches = |expected: &[ArrowField]| {
659            if current_arrow_schema.fields().len() != expected.len() {
660                return false;
661            }
662
663            expected.iter().all(|expected_field| {
664                current_arrow_schema.fields().iter().any(|current_field| {
665                    current_field.name() == expected_field.name()
666                        && current_field.data_type() == expected_field.data_type()
667                })
668            })
669        };
670
671        let original_arrow_fields: Vec<ArrowField> = schema_change
672            .original_schema
673            .iter()
674            .map(|pb_field| {
675                let field = Field::from(pb_field);
676                iceberg_arrow_convert
677                    .to_arrow_field(&field.name, &field.data_type)
678                    .context("Failed to convert field to arrow")
679                    .map_err(SinkError::Iceberg)
680            })
681            .collect::<Result<_>>()?;
682
683        // If current schema equals original_schema, then schema change is NOT applied.
684        if schema_matches(&original_arrow_fields) {
685            tracing::debug!(
686                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
687                original_arrow_fields.len()
688            );
689            return Ok(false);
690        }
691
692        // We only support add_columns for now.
693        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
694            schema_change.op.as_ref()
695        else {
696            return Err(SinkError::Iceberg(anyhow!(
697                "Unsupported sink schema change op in iceberg sink: {:?}",
698                schema_change.op
699            )));
700        };
701
702        let add_arrow_fields: Vec<ArrowField> = add_columns_op
703            .fields
704            .iter()
705            .map(|pb_field| {
706                let field = Field::from(pb_field);
707                iceberg_arrow_convert
708                    .to_arrow_field(&field.name, &field.data_type)
709                    .context("Failed to convert field to arrow")
710                    .map_err(SinkError::Iceberg)
711            })
712            .collect::<Result<_>>()?;
713
714        let mut expected_after_change = original_arrow_fields;
715        expected_after_change.extend(add_arrow_fields);
716
717        // If current schema equals original_schema + add_columns, then schema change is applied.
718        if schema_matches(&expected_after_change) {
719            tracing::debug!(
720                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
721                expected_after_change.len()
722            );
723            return Ok(true);
724        }
725
726        Err(SinkError::Iceberg(anyhow!(
727            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
728            schema_change.original_schema.len()
729        )))
730    }
731
732    /// Commit schema changes (e.g., add columns) to the iceberg table.
733    /// This function uses Transaction API to atomically update the table schema
734    /// with optimistic locking to prevent concurrent conflicts.
735    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
736        use iceberg::spec::NestedField;
737
738        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
739            schema_change.op.as_ref()
740        else {
741            return Err(SinkError::Iceberg(anyhow!(
742                "Unsupported sink schema change op in iceberg sink: {:?}",
743                schema_change.op
744            )));
745        };
746
747        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
748
749        // Step 1: Get current table metadata
750        let metadata = self.table.metadata();
751        let mut next_field_id = metadata.last_column_id() + 1;
752        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
753
754        // Step 2: Build new fields to add
755        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
756        let mut new_fields = Vec::new();
757
758        for field in &add_columns {
759            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
760            let arrow_field = iceberg_create_table_arrow_convert
761                .to_arrow_field(&field.name, &field.data_type)
762                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
763                .map_err(SinkError::Iceberg)?;
764
765            // Convert Arrow DataType to Iceberg Type
766            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
767                .map_err(|err| {
768                    SinkError::Iceberg(
769                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
770                    )
771                })?;
772
773            // Create NestedField with the next available field ID
774            let nested_field = Arc::new(NestedField::optional(
775                next_field_id,
776                &field.name,
777                iceberg_type,
778            ));
779
780            new_fields.push(nested_field);
781            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
782            next_field_id += 1;
783        }
784
785        // Step 3: Create Transaction with UpdateSchemaAction
786        tracing::info!(
787            "Committing schema change to catalog for table {}",
788            self.table.identifier()
789        );
790
791        let txn = Transaction::new(&self.table);
792        let action = txn.update_schema().add_fields(new_fields);
793
794        let updated_table = action
795            .apply(txn)
796            .context("Failed to apply schema update action")
797            .map_err(SinkError::Iceberg)?
798            .commit(self.catalog.as_ref())
799            .await
800            .context("Failed to commit table schema change")
801            .map_err(SinkError::Iceberg)?;
802
803        self.table = updated_table;
804
805        tracing::info!(
806            "Successfully committed schema change, added {} columns to iceberg table",
807            add_columns.len()
808        );
809
810        Ok(())
811    }
812
813    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
814    /// Returns the number of snapshots since the last rewrite/overwrite
815    fn count_snapshots_since_rewrite(&self) -> usize {
816        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
817        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
818
819        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
820        let mut count = 0;
821        for snapshot in snapshots {
822            // Check if this snapshot represents a rewrite or overwrite operation
823            let summary = snapshot.summary();
824            match &summary.operation {
825                Operation::Replace => {
826                    // Found a rewrite/overwrite operation, stop counting
827                    break;
828                }
829
830                _ => {
831                    // Increment count for each snapshot that is not a rewrite/overwrite
832                    count += 1;
833                }
834            }
835        }
836
837        count
838    }
839
840    /// Wait until snapshot count since last rewrite is below the limit
841    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
842        if !self.config.enable_compaction {
843            return Ok(());
844        }
845
846        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
847            loop {
848                let current_count = self.count_snapshots_since_rewrite();
849
850                if current_count < max_snapshots {
851                    tracing::info!(
852                        "Snapshot count check passed: {} < {}",
853                        current_count,
854                        max_snapshots
855                    );
856                    break;
857                }
858
859                tracing::info!(
860                    "Snapshot count {} exceeds limit {}, waiting...",
861                    current_count,
862                    max_snapshots
863                );
864
865                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
866                    && iceberg_compact_stat_sender
867                        .send(IcebergSinkCompactionUpdate {
868                            sink_id: self.sink_id,
869                            force_compaction: true,
870                        })
871                        .is_err()
872                {
873                    tracing::warn!("failed to send iceberg compaction stats");
874                }
875
876                // Wait for 30 seconds before checking again
877                tokio::time::sleep(Duration::from_secs(30)).await;
878
879                // Reload table to get latest snapshots
880                self.table = self.config.load_table().await?;
881            }
882        }
883        Ok(())
884    }
885}
886
887fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
888    serde_json::to_vec(&metadata).unwrap()
889}
890
891fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
892    serde_json::from_slice(&bytes).unwrap()
893}