risingwave_connector/sink/iceberg/
commit.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{DataFile, Operation, SerializedDataFile};
22use iceberg::table::Table;
23use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
24use iceberg::{Catalog, TableIdent};
25use itertools::Itertools;
26use risingwave_common::array::arrow::arrow_schema_iceberg::Field as ArrowField;
27use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
28use risingwave_common::bail;
29use risingwave_common::catalog::Field;
30use risingwave_common::error::IcebergError;
31use risingwave_pb::connector_service::SinkMetadata;
32use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
33use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
34use risingwave_pb::stream_plan::PbSinkSchemaChange;
35use serde_json::from_value;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedSender;
38use tokio_retry::RetryIf;
39use tokio_retry::strategy::{ExponentialBackoff, jitter};
40use tracing::warn;
41
42use super::{GLOBAL_SINK_METRICS, IcebergConfig, SinkError, commit_branch};
43use crate::connector_common::IcebergSinkCompactionUpdate;
44use crate::sink::catalog::SinkId;
45use crate::sink::{Result, SinglePhaseCommitCoordinator, SinkParam, TwoPhaseCommitCoordinator};
46
47const SCHEMA_ID: &str = "schema_id";
48const PARTITION_SPEC_ID: &str = "partition_spec_id";
49const DATA_FILES: &str = "data_files";
50
51#[derive(Default, Clone)]
52pub struct IcebergCommitResult {
53    pub schema_id: i32,
54    pub partition_spec_id: i32,
55    pub data_files: Vec<SerializedDataFile>,
56}
57
58impl IcebergCommitResult {
59    fn try_from(value: &SinkMetadata) -> Result<Self> {
60        if let Some(Serialized(v)) = &value.metadata {
61            let mut values = if let serde_json::Value::Object(v) =
62                serde_json::from_slice::<serde_json::Value>(&v.metadata)
63                    .context("Can't parse iceberg sink metadata")?
64            {
65                v
66            } else {
67                bail!("iceberg sink metadata should be an object");
68            };
69
70            let schema_id;
71            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
72                schema_id = value
73                    .as_u64()
74                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
75            } else {
76                bail!("iceberg sink metadata should have schema_id");
77            }
78
79            let partition_spec_id;
80            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
81                partition_spec_id = value
82                    .as_u64()
83                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
84            } else {
85                bail!("iceberg sink metadata should have partition_spec_id");
86            }
87
88            let data_files: Vec<SerializedDataFile>;
89            if let serde_json::Value::Array(values) = values
90                .remove(DATA_FILES)
91                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
92            {
93                data_files = values
94                    .into_iter()
95                    .map(from_value::<SerializedDataFile>)
96                    .collect::<std::result::Result<_, _>>()
97                    .unwrap();
98            } else {
99                bail!("iceberg sink metadata should have data_files object");
100            }
101
102            Ok(Self {
103                schema_id: schema_id as i32,
104                partition_spec_id: partition_spec_id as i32,
105                data_files,
106            })
107        } else {
108            bail!("Can't create iceberg sink write result from empty data!")
109        }
110    }
111
112    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
113        let mut values = if let serde_json::Value::Object(value) =
114            serde_json::from_slice::<serde_json::Value>(&value)
115                .context("Can't parse iceberg sink metadata")?
116        {
117            value
118        } else {
119            bail!("iceberg sink metadata should be an object");
120        };
121
122        let schema_id;
123        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
124            schema_id = value
125                .as_u64()
126                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
127        } else {
128            bail!("iceberg sink metadata should have schema_id");
129        }
130
131        let partition_spec_id;
132        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
133            partition_spec_id = value
134                .as_u64()
135                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
136        } else {
137            bail!("iceberg sink metadata should have partition_spec_id");
138        }
139
140        let data_files: Vec<SerializedDataFile>;
141        if let serde_json::Value::Array(values) = values
142            .remove(DATA_FILES)
143            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
144        {
145            data_files = values
146                .into_iter()
147                .map(from_value::<SerializedDataFile>)
148                .collect::<std::result::Result<_, _>>()
149                .unwrap();
150        } else {
151            bail!("iceberg sink metadata should have data_files object");
152        }
153
154        Ok(Self {
155            schema_id: schema_id as i32,
156            partition_spec_id: partition_spec_id as i32,
157            data_files,
158        })
159    }
160}
161
162impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
163    type Error = SinkError;
164
165    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
166        let json_data_files = serde_json::Value::Array(
167            value
168                .data_files
169                .iter()
170                .map(serde_json::to_value)
171                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
172                .context("Can't serialize data files to json")?,
173        );
174        let json_value = serde_json::Value::Object(
175            vec![
176                (
177                    SCHEMA_ID.to_owned(),
178                    serde_json::Value::Number(value.schema_id.into()),
179                ),
180                (
181                    PARTITION_SPEC_ID.to_owned(),
182                    serde_json::Value::Number(value.partition_spec_id.into()),
183                ),
184                (DATA_FILES.to_owned(), json_data_files),
185            ]
186            .into_iter()
187            .collect(),
188        );
189        Ok(SinkMetadata {
190            metadata: Some(Serialized(SerializedMetadata {
191                metadata: serde_json::to_vec(&json_value)
192                    .context("Can't serialize iceberg sink metadata")?,
193            })),
194        })
195    }
196}
197
198impl TryFrom<IcebergCommitResult> for Vec<u8> {
199    type Error = SinkError;
200
201    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
202        let json_data_files = serde_json::Value::Array(
203            value
204                .data_files
205                .iter()
206                .map(serde_json::to_value)
207                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
208                .context("Can't serialize data files to json")?,
209        );
210        let json_value = serde_json::Value::Object(
211            vec![
212                (
213                    SCHEMA_ID.to_owned(),
214                    serde_json::Value::Number(value.schema_id.into()),
215                ),
216                (
217                    PARTITION_SPEC_ID.to_owned(),
218                    serde_json::Value::Number(value.partition_spec_id.into()),
219                ),
220                (DATA_FILES.to_owned(), json_data_files),
221            ]
222            .into_iter()
223            .collect(),
224        );
225        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
226    }
227}
228pub struct IcebergSinkCommitter {
229    pub(super) catalog: Arc<dyn Catalog>,
230    pub(super) table: Table,
231    pub last_commit_epoch: u64,
232    pub(crate) sink_id: SinkId,
233    pub(crate) config: IcebergConfig,
234    pub(crate) param: SinkParam,
235    pub(super) commit_retry_num: u32,
236    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
237}
238
239impl IcebergSinkCommitter {
240    // Reload table and guarantee current schema_id and partition_spec_id matches
241    // given `schema_id` and `partition_spec_id`
242    async fn reload_table(
243        catalog: &dyn Catalog,
244        table_ident: &TableIdent,
245        schema_id: i32,
246        partition_spec_id: i32,
247    ) -> Result<Table> {
248        let table = catalog
249            .load_table(table_ident)
250            .await
251            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
252        if table.metadata().current_schema_id() != schema_id {
253            return Err(SinkError::Iceberg(anyhow!(
254                "Schema evolution not supported, expect schema id {}, but got {}",
255                schema_id,
256                table.metadata().current_schema_id()
257            )));
258        }
259        if table.metadata().default_partition_spec_id() != partition_spec_id {
260            return Err(SinkError::Iceberg(anyhow!(
261                "Partition evolution not supported, expect partition spec id {}, but got {}",
262                partition_spec_id,
263                table.metadata().default_partition_spec_id()
264            )));
265        }
266        Ok(table)
267    }
268}
269
270#[async_trait]
271impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
272    async fn init(&mut self) -> Result<()> {
273        tracing::info!(
274            sink_id = %self.param.sink_id,
275            "Iceberg sink coordinator initialized",
276        );
277
278        Ok(())
279    }
280
281    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
282        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
283
284        if metadata.is_empty() {
285            tracing::debug!(?epoch, "No datafile to commit");
286            return Ok(());
287        }
288
289        // Commit data if present
290        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
291            self.commit_data_impl(epoch, write_results, snapshot_id)
292                .await?;
293        }
294
295        Ok(())
296    }
297
298    async fn commit_schema_change(
299        &mut self,
300        epoch: u64,
301        schema_change: PbSinkSchemaChange,
302    ) -> Result<()> {
303        tracing::info!(
304            "Committing schema change {:?} in epoch {}",
305            schema_change,
306            epoch
307        );
308        self.commit_schema_change_impl(schema_change).await?;
309        tracing::info!("Successfully committed schema change in epoch {}", epoch);
310
311        Ok(())
312    }
313}
314
315#[async_trait]
316impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
317    async fn init(&mut self) -> Result<()> {
318        tracing::info!(
319            sink_id = %self.param.sink_id,
320            "Iceberg sink coordinator initialized",
321        );
322
323        Ok(())
324    }
325
326    async fn pre_commit(
327        &mut self,
328        epoch: u64,
329        metadata: Vec<SinkMetadata>,
330        _schema_change: Option<PbSinkSchemaChange>,
331    ) -> Result<Option<Vec<u8>>> {
332        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
333
334        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
335            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
336            None => {
337                tracing::debug!(?epoch, "no data to pre commit");
338                return Ok(None);
339            }
340        };
341
342        let mut write_results_bytes = Vec::new();
343        for each_parallelism_write_result in write_results {
344            let each_parallelism_write_result_bytes: Vec<u8> =
345                each_parallelism_write_result.try_into()?;
346            write_results_bytes.push(each_parallelism_write_result_bytes);
347        }
348
349        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
350        write_results_bytes.push(snapshot_id_bytes);
351
352        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
353        Ok(Some(pre_commit_metadata_bytes))
354    }
355
356    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
357        tracing::debug!("Starting iceberg commit in epoch {epoch}");
358
359        if commit_metadata.is_empty() {
360            tracing::debug!(?epoch, "No datafile to commit");
361            return Ok(());
362        }
363
364        // Deserialize commit metadata
365        let mut payload = deserialize_metadata(commit_metadata);
366        if payload.is_empty() {
367            return Err(SinkError::Iceberg(anyhow!(
368                "Invalid commit metadata: empty payload"
369            )));
370        }
371
372        // Last element is snapshot_id
373        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
374            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
375        })?;
376        let snapshot_id = i64::from_le_bytes(
377            snapshot_id_bytes
378                .try_into()
379                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
380        );
381
382        // Remaining elements are write_results
383        let write_results = payload
384            .into_iter()
385            .map(IcebergCommitResult::try_from_serialized_bytes)
386            .collect::<Result<Vec<_>>>()?;
387
388        let snapshot_committed = self
389            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
390            .await?;
391
392        if snapshot_committed {
393            tracing::info!(
394                "Snapshot id {} already committed in iceberg table, skip committing again.",
395                snapshot_id
396            );
397            return Ok(());
398        }
399
400        self.commit_data_impl(epoch, write_results, snapshot_id)
401            .await
402    }
403
404    async fn commit_schema_change(
405        &mut self,
406        epoch: u64,
407        schema_change: PbSinkSchemaChange,
408    ) -> Result<()> {
409        let schema_updated = self.check_schema_change_applied(&schema_change)?;
410        if schema_updated {
411            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
412            return Ok(());
413        }
414
415        tracing::info!(
416            "Committing schema change {:?} in epoch {}",
417            schema_change,
418            epoch
419        );
420        self.commit_schema_change_impl(schema_change).await?;
421        tracing::info!("Successfully committed schema change in epoch {epoch}");
422
423        Ok(())
424    }
425
426    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
427        // TODO: Files that have been written but not committed should be deleted.
428        tracing::debug!("Abort not implemented yet");
429    }
430}
431
432/// Methods Required to Achieve Exactly Once Semantics
433impl IcebergSinkCommitter {
434    fn pre_commit_inner(
435        &mut self,
436        _epoch: u64,
437        metadata: Vec<SinkMetadata>,
438    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
439        let write_results: Vec<IcebergCommitResult> = metadata
440            .iter()
441            .map(IcebergCommitResult::try_from)
442            .collect::<Result<Vec<IcebergCommitResult>>>()?;
443
444        // Skip if no data to commit
445        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
446            return Ok(None);
447        }
448
449        let expect_schema_id = write_results[0].schema_id;
450        let expect_partition_spec_id = write_results[0].partition_spec_id;
451
452        // guarantee that all write results has same schema_id and partition_spec_id
453        if write_results
454            .iter()
455            .any(|r| r.schema_id != expect_schema_id)
456            || write_results
457                .iter()
458                .any(|r| r.partition_spec_id != expect_partition_spec_id)
459        {
460            return Err(SinkError::Iceberg(anyhow!(
461                "schema_id and partition_spec_id should be the same in all write results"
462            )));
463        }
464
465        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
466
467        Ok(Some((write_results, snapshot_id)))
468    }
469
470    async fn commit_data_impl(
471        &mut self,
472        epoch: u64,
473        write_results: Vec<IcebergCommitResult>,
474        snapshot_id: i64,
475    ) -> Result<()> {
476        // Empty write results should be handled before calling this function.
477        assert!(
478            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
479        );
480
481        // Check snapshot limit before proceeding with commit
482        self.wait_for_snapshot_limit().await?;
483
484        let expect_schema_id = write_results[0].schema_id;
485        let expect_partition_spec_id = write_results[0].partition_spec_id;
486
487        // Load the latest table to avoid concurrent modification with the best effort.
488        self.table = Self::reload_table(
489            self.catalog.as_ref(),
490            self.table.identifier(),
491            expect_schema_id,
492            expect_partition_spec_id,
493        )
494        .await?;
495
496        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
497            return Err(SinkError::Iceberg(anyhow!(
498                "Can't find schema by id {}",
499                expect_schema_id
500            )));
501        };
502        let Some(partition_spec) = self
503            .table
504            .metadata()
505            .partition_spec_by_id(expect_partition_spec_id)
506        else {
507            return Err(SinkError::Iceberg(anyhow!(
508                "Can't find partition spec by id {}",
509                expect_partition_spec_id
510            )));
511        };
512        let partition_type = partition_spec
513            .as_ref()
514            .clone()
515            .partition_type(schema)
516            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
517
518        let data_files = write_results
519            .into_iter()
520            .flat_map(|r| {
521                r.data_files.into_iter().map(|f| {
522                    f.try_into(expect_partition_spec_id, &partition_type, schema)
523                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
524                })
525            })
526            .collect::<Result<Vec<DataFile>>>()?;
527        // # TODO:
528        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
529        // because retry logic involved reapply the commit metadata.
530        // For now, we just retry the commit operation.
531        let retry_strategy = ExponentialBackoff::from_millis(10)
532            .max_delay(Duration::from_secs(60))
533            .map(jitter)
534            .take(self.commit_retry_num as usize);
535        let catalog = self.catalog.clone();
536        let table_ident = self.table.identifier().clone();
537
538        // Custom retry logic that:
539        // 1. Calls reload_table before each commit attempt to get the latest metadata
540        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
541        // 3. If commit fails, retries with backoff
542        enum CommitError {
543            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
544            Commit(SinkError),      // Retriable: commit conflicts, network errors
545        }
546
547        let table = RetryIf::spawn(
548            retry_strategy,
549            || async {
550                // Reload table before each commit attempt to get the latest metadata
551                let table = Self::reload_table(
552                    catalog.as_ref(),
553                    &table_ident,
554                    expect_schema_id,
555                    expect_partition_spec_id,
556                )
557                .await
558                .map_err(|e| {
559                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
560                    CommitError::ReloadTable(e)
561                })?;
562
563                let txn = Transaction::new(&table);
564                let append_action = txn
565                    .fast_append()
566                    .set_snapshot_id(snapshot_id)
567                    .set_target_branch(commit_branch(
568                        self.config.r#type.as_str(),
569                        self.config.write_mode,
570                    ))
571                    .add_data_files(data_files.clone());
572
573                let tx = append_action.apply(txn).map_err(|err| {
574                    let err: IcebergError = err.into();
575                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
576                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
577                })?;
578
579                tx.commit(catalog.as_ref()).await.map_err(|err| {
580                    let err: IcebergError = err.into();
581                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
582                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
583                })
584            },
585            |err: &CommitError| {
586                // Only retry on commit errors, not on reload_table errors
587                match err {
588                    CommitError::Commit(_) => {
589                        tracing::warn!("Commit failed, will retry");
590                        true
591                    }
592                    CommitError::ReloadTable(_) => {
593                        tracing::error!(
594                            "reload_table failed with non-retriable error, will not retry"
595                        );
596                        false
597                    }
598                }
599            },
600        )
601        .await
602        .map_err(|e| match e {
603            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
604        })?;
605        self.table = table;
606
607        let snapshot_num = self.table.metadata().snapshots().count();
608        let catalog_name = self.config.common.catalog_name();
609        let table_name = self.table.identifier().to_string();
610        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
611        GLOBAL_SINK_METRICS
612            .iceberg_snapshot_num
613            .with_guarded_label_values(&metrics_labels)
614            .set(snapshot_num as i64);
615
616        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
617
618        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
619            && self.config.enable_compaction
620            && iceberg_compact_stat_sender
621                .send(IcebergSinkCompactionUpdate {
622                    sink_id: self.sink_id,
623                    compaction_interval: self.config.compaction_interval_sec(),
624                    force_compaction: false,
625                })
626                .is_err()
627        {
628            warn!("failed to send iceberg compaction stats");
629        }
630
631        Ok(())
632    }
633
634    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
635    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
636    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
637    async fn is_snapshot_id_in_iceberg(
638        &self,
639        iceberg_config: &IcebergConfig,
640        snapshot_id: i64,
641    ) -> Result<bool> {
642        let table = iceberg_config.load_table().await?;
643        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
644            Ok(true)
645        } else {
646            Ok(false)
647        }
648    }
649
650    /// Check if the specified columns already exist in the iceberg table's current schema.
651    /// This is used to determine if schema change has already been applied.
652    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
653        let current_schema = self.table.metadata().current_schema();
654        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
655            .context("Failed to convert schema")
656            .map_err(SinkError::Iceberg)?;
657
658        let iceberg_arrow_convert = IcebergArrowConvert;
659
660        let schema_matches = |expected: &[ArrowField]| {
661            if current_arrow_schema.fields().len() != expected.len() {
662                return false;
663            }
664
665            expected.iter().all(|expected_field| {
666                current_arrow_schema.fields().iter().any(|current_field| {
667                    current_field.name() == expected_field.name()
668                        && current_field.data_type() == expected_field.data_type()
669                })
670            })
671        };
672
673        let original_arrow_fields: Vec<ArrowField> = schema_change
674            .original_schema
675            .iter()
676            .map(|pb_field| {
677                let field = Field::from(pb_field);
678                iceberg_arrow_convert
679                    .to_arrow_field(&field.name, &field.data_type)
680                    .context("Failed to convert field to arrow")
681                    .map_err(SinkError::Iceberg)
682            })
683            .collect::<Result<_>>()?;
684
685        // If current schema equals original_schema, then schema change is NOT applied.
686        if schema_matches(&original_arrow_fields) {
687            tracing::debug!(
688                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
689                original_arrow_fields.len()
690            );
691            return Ok(false);
692        }
693
694        // We only support add_columns for now.
695        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
696            schema_change.op.as_ref()
697        else {
698            return Err(SinkError::Iceberg(anyhow!(
699                "Unsupported sink schema change op in iceberg sink: {:?}",
700                schema_change.op
701            )));
702        };
703
704        let add_arrow_fields: Vec<ArrowField> = add_columns_op
705            .fields
706            .iter()
707            .map(|pb_field| {
708                let field = Field::from(pb_field);
709                iceberg_arrow_convert
710                    .to_arrow_field(&field.name, &field.data_type)
711                    .context("Failed to convert field to arrow")
712                    .map_err(SinkError::Iceberg)
713            })
714            .collect::<Result<_>>()?;
715
716        let mut expected_after_change = original_arrow_fields;
717        expected_after_change.extend(add_arrow_fields);
718
719        // If current schema equals original_schema + add_columns, then schema change is applied.
720        if schema_matches(&expected_after_change) {
721            tracing::debug!(
722                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
723                expected_after_change.len()
724            );
725            return Ok(true);
726        }
727
728        Err(SinkError::Iceberg(anyhow!(
729            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
730            schema_change.original_schema.len()
731        )))
732    }
733
734    /// Commit schema changes (e.g., add columns) to the iceberg table.
735    /// This function uses Transaction API to atomically update the table schema
736    /// with optimistic locking to prevent concurrent conflicts.
737    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
738        use iceberg::spec::NestedField;
739
740        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
741            schema_change.op.as_ref()
742        else {
743            return Err(SinkError::Iceberg(anyhow!(
744                "Unsupported sink schema change op in iceberg sink: {:?}",
745                schema_change.op
746            )));
747        };
748
749        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
750
751        // Step 1: Get current table metadata
752        let metadata = self.table.metadata();
753        let mut next_field_id = metadata.last_column_id() + 1;
754        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
755
756        // Step 2: Build new fields to add
757        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
758        let mut new_fields = Vec::new();
759
760        for field in &add_columns {
761            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
762            let arrow_field = iceberg_create_table_arrow_convert
763                .to_arrow_field(&field.name, &field.data_type)
764                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
765                .map_err(SinkError::Iceberg)?;
766
767            // Convert Arrow DataType to Iceberg Type
768            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
769                .map_err(|err| {
770                    SinkError::Iceberg(
771                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
772                    )
773                })?;
774
775            // Create NestedField with the next available field ID
776            let nested_field = Arc::new(NestedField::optional(
777                next_field_id,
778                &field.name,
779                iceberg_type,
780            ));
781
782            new_fields.push(nested_field);
783            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
784            next_field_id += 1;
785        }
786
787        // Step 3: Create Transaction with UpdateSchemaAction
788        tracing::info!(
789            "Committing schema change to catalog for table {}",
790            self.table.identifier()
791        );
792
793        let txn = Transaction::new(&self.table);
794        let action = txn.update_schema().add_fields(new_fields);
795
796        let updated_table = action
797            .apply(txn)
798            .context("Failed to apply schema update action")
799            .map_err(SinkError::Iceberg)?
800            .commit(self.catalog.as_ref())
801            .await
802            .context("Failed to commit table schema change")
803            .map_err(SinkError::Iceberg)?;
804
805        self.table = updated_table;
806
807        tracing::info!(
808            "Successfully committed schema change, added {} columns to iceberg table",
809            add_columns.len()
810        );
811
812        Ok(())
813    }
814
815    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
816    /// Returns the number of snapshots since the last rewrite/overwrite
817    fn count_snapshots_since_rewrite(&self) -> usize {
818        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
819        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
820
821        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
822        let mut count = 0;
823        for snapshot in snapshots {
824            // Check if this snapshot represents a rewrite or overwrite operation
825            let summary = snapshot.summary();
826            match &summary.operation {
827                Operation::Replace => {
828                    // Found a rewrite/overwrite operation, stop counting
829                    break;
830                }
831
832                _ => {
833                    // Increment count for each snapshot that is not a rewrite/overwrite
834                    count += 1;
835                }
836            }
837        }
838
839        count
840    }
841
842    /// Wait until snapshot count since last rewrite is below the limit
843    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
844        if !self.config.enable_compaction {
845            return Ok(());
846        }
847
848        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
849            loop {
850                let current_count = self.count_snapshots_since_rewrite();
851
852                if current_count < max_snapshots {
853                    tracing::info!(
854                        "Snapshot count check passed: {} < {}",
855                        current_count,
856                        max_snapshots
857                    );
858                    break;
859                }
860
861                tracing::info!(
862                    "Snapshot count {} exceeds limit {}, waiting...",
863                    current_count,
864                    max_snapshots
865                );
866
867                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
868                    && iceberg_compact_stat_sender
869                        .send(IcebergSinkCompactionUpdate {
870                            sink_id: self.sink_id,
871                            compaction_interval: self.config.compaction_interval_sec(),
872                            force_compaction: true,
873                        })
874                        .is_err()
875                {
876                    tracing::warn!("failed to send iceberg compaction stats");
877                }
878
879                // Wait for 30 seconds before checking again
880                tokio::time::sleep(Duration::from_secs(30)).await;
881
882                // Reload table to get latest snapshots
883                self.table = self.config.load_table().await?;
884            }
885        }
886        Ok(())
887    }
888}
889
890fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
891    serde_json::to_vec(&metadata).unwrap()
892}
893
894fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
895    serde_json::from_slice(&bytes).unwrap()
896}