risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103// Configuration constants
104pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112    "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn default_commit_retry_num() -> u32 {
116    8
117}
118
119fn default_iceberg_write_mode() -> String {
120    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
121}
122
123fn default_true() -> bool {
124    true
125}
126
127fn default_some_true() -> Option<bool> {
128    Some(true)
129}
130
131#[serde_as]
132#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
133pub struct IcebergConfig {
134    pub r#type: String, // accept "append-only" or "upsert"
135
136    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
137    pub force_append_only: bool,
138
139    #[serde(flatten)]
140    common: IcebergCommon,
141
142    #[serde(flatten)]
143    table: IcebergTableIdentifier,
144
145    #[serde(
146        rename = "primary_key",
147        default,
148        deserialize_with = "deserialize_optional_string_seq_from_string"
149    )]
150    pub primary_key: Option<Vec<String>>,
151
152    // Props for java catalog props.
153    #[serde(skip)]
154    pub java_catalog_props: HashMap<String, String>,
155
156    #[serde(default)]
157    pub partition_by: Option<String>,
158
159    /// Commit every n(>0) checkpoints, default is 60.
160    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
161    #[serde_as(as = "DisplayFromStr")]
162    #[with_option(allow_alter_on_fly)]
163    pub commit_checkpoint_interval: u64,
164
165    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
166    pub create_table_if_not_exists: bool,
167
168    /// Whether it is `exactly_once`, the default is true.
169    #[serde(default = "default_some_true")]
170    #[serde_as(as = "Option<DisplayFromStr>")]
171    pub is_exactly_once: Option<bool>,
172    // Retry commit num when iceberg commit fail. default is 8.
173    // # TODO
174    // Iceberg table may store the retry commit num in table meta.
175    // We should try to find and use that as default commit retry num first.
176    #[serde(default = "default_commit_retry_num")]
177    pub commit_retry_num: u32,
178
179    /// Whether to enable iceberg compaction.
180    #[serde(
181        rename = "enable_compaction",
182        default,
183        deserialize_with = "deserialize_bool_from_string"
184    )]
185    #[with_option(allow_alter_on_fly)]
186    pub enable_compaction: bool,
187
188    /// The interval of iceberg compaction
189    #[serde(rename = "compaction_interval_sec", default)]
190    #[serde_as(as = "Option<DisplayFromStr>")]
191    #[with_option(allow_alter_on_fly)]
192    pub compaction_interval_sec: Option<u64>,
193
194    /// Whether to enable iceberg expired snapshots.
195    #[serde(
196        rename = "enable_snapshot_expiration",
197        default,
198        deserialize_with = "deserialize_bool_from_string"
199    )]
200    #[with_option(allow_alter_on_fly)]
201    pub enable_snapshot_expiration: bool,
202
203    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
204    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
205    pub write_mode: String,
206
207    /// The maximum age (in milliseconds) for snapshots before they expire
208    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
209    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
210    #[serde_as(as = "Option<DisplayFromStr>")]
211    #[with_option(allow_alter_on_fly)]
212    pub snapshot_expiration_max_age_millis: Option<i64>,
213
214    /// The number of snapshots to retain
215    #[serde(rename = "snapshot_expiration_retain_last", default)]
216    #[serde_as(as = "Option<DisplayFromStr>")]
217    #[with_option(allow_alter_on_fly)]
218    pub snapshot_expiration_retain_last: Option<i32>,
219
220    #[serde(
221        rename = "snapshot_expiration_clear_expired_files",
222        default = "default_true",
223        deserialize_with = "deserialize_bool_from_string"
224    )]
225    #[with_option(allow_alter_on_fly)]
226    pub snapshot_expiration_clear_expired_files: bool,
227
228    #[serde(
229        rename = "snapshot_expiration_clear_expired_meta_data",
230        default = "default_true",
231        deserialize_with = "deserialize_bool_from_string"
232    )]
233    #[with_option(allow_alter_on_fly)]
234    pub snapshot_expiration_clear_expired_meta_data: bool,
235
236    /// The maximum number of snapshots allowed since the last rewrite operation
237    /// If set, sink will check snapshot count and wait if exceeded
238    #[serde(rename = "max_snapshots_num_before_compaction", default)]
239    #[serde_as(as = "Option<DisplayFromStr>")]
240    #[with_option(allow_alter_on_fly)]
241    pub max_snapshots_num_before_compaction: Option<usize>,
242}
243
244impl EnforceSecret for IcebergConfig {
245    fn enforce_secret<'a>(
246        prop_iter: impl Iterator<Item = &'a str>,
247    ) -> crate::error::ConnectorResult<()> {
248        for prop in prop_iter {
249            IcebergCommon::enforce_one(prop)?;
250        }
251        Ok(())
252    }
253
254    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
255        IcebergCommon::enforce_one(prop)
256    }
257}
258
259impl IcebergConfig {
260    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
261        let mut config =
262            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
263                .map_err(|e| SinkError::Config(anyhow!(e)))?;
264
265        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
266            return Err(SinkError::Config(anyhow!(
267                "`{}` must be {}, or {}",
268                SINK_TYPE_OPTION,
269                SINK_TYPE_APPEND_ONLY,
270                SINK_TYPE_UPSERT
271            )));
272        }
273
274        if config.r#type == SINK_TYPE_UPSERT {
275            if let Some(primary_key) = &config.primary_key {
276                if primary_key.is_empty() {
277                    return Err(SinkError::Config(anyhow!(
278                        "`primary_key` must not be empty in {}",
279                        SINK_TYPE_UPSERT
280                    )));
281                }
282            } else {
283                return Err(SinkError::Config(anyhow!(
284                    "Must set `primary_key` in {}",
285                    SINK_TYPE_UPSERT
286                )));
287            }
288        }
289
290        // All configs start with "catalog." will be treated as java configs.
291        config.java_catalog_props = values
292            .iter()
293            .filter(|(k, _v)| {
294                k.starts_with("catalog.")
295                    && k != &"catalog.uri"
296                    && k != &"catalog.type"
297                    && k != &"catalog.name"
298                    && k != &"catalog.header"
299            })
300            .map(|(k, v)| (k[8..].to_string(), v.clone()))
301            .collect();
302
303        if config.commit_checkpoint_interval == 0 {
304            return Err(SinkError::Config(anyhow!(
305                "`commit_checkpoint_interval` must be greater than 0"
306            )));
307        }
308
309        Ok(config)
310    }
311
312    pub fn catalog_type(&self) -> &str {
313        self.common.catalog_type()
314    }
315
316    pub async fn load_table(&self) -> Result<Table> {
317        self.common
318            .load_table(&self.table, &self.java_catalog_props)
319            .await
320            .map_err(Into::into)
321    }
322
323    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
324        self.common
325            .create_catalog(&self.java_catalog_props)
326            .await
327            .map_err(Into::into)
328    }
329
330    pub fn full_table_name(&self) -> Result<TableIdent> {
331        self.table.to_table_ident().map_err(Into::into)
332    }
333
334    pub fn catalog_name(&self) -> String {
335        self.common.catalog_name()
336    }
337
338    pub fn compaction_interval_sec(&self) -> u64 {
339        // default to 1 hour
340        self.compaction_interval_sec.unwrap_or(3600)
341    }
342
343    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
344    /// Returns `current_time_ms` - `max_age_millis`
345    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
346        self.snapshot_expiration_max_age_millis
347            .map(|max_age_millis| current_time_ms - max_age_millis)
348    }
349}
350
351pub struct IcebergSink {
352    pub config: IcebergConfig,
353    param: SinkParam,
354    // In upsert mode, it never be None and empty.
355    unique_column_ids: Option<Vec<usize>>,
356}
357
358impl EnforceSecret for IcebergSink {
359    fn enforce_secret<'a>(
360        prop_iter: impl Iterator<Item = &'a str>,
361    ) -> crate::error::ConnectorResult<()> {
362        for prop in prop_iter {
363            IcebergConfig::enforce_one(prop)?;
364        }
365        Ok(())
366    }
367}
368
369impl TryFrom<SinkParam> for IcebergSink {
370    type Error = SinkError;
371
372    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
373        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
374        IcebergSink::new(config, param)
375    }
376}
377
378impl Debug for IcebergSink {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        f.debug_struct("IcebergSink")
381            .field("config", &self.config)
382            .finish()
383    }
384}
385
386impl IcebergSink {
387    pub async fn create_and_validate_table(&self) -> Result<Table> {
388        if self.config.create_table_if_not_exists {
389            self.create_table_if_not_exists().await?;
390        }
391
392        let table = self
393            .config
394            .load_table()
395            .await
396            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
397
398        let sink_schema = self.param.schema();
399        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
400            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
401
402        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
403            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
404
405        Ok(table)
406    }
407
408    pub async fn create_table_if_not_exists(&self) -> Result<()> {
409        let catalog = self.config.create_catalog().await?;
410        let namespace = if let Some(database_name) = self.config.table.database_name() {
411            let namespace = NamespaceIdent::new(database_name.to_owned());
412            if !catalog
413                .namespace_exists(&namespace)
414                .await
415                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
416            {
417                catalog
418                    .create_namespace(&namespace, HashMap::default())
419                    .await
420                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
421                    .context("failed to create iceberg namespace")?;
422            }
423            namespace
424        } else {
425            bail!("database name must be set if you want to create table")
426        };
427
428        let table_id = self
429            .config
430            .full_table_name()
431            .context("Unable to parse table name")?;
432        if !catalog
433            .table_exists(&table_id)
434            .await
435            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
436        {
437            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
438            // convert risingwave schema -> arrow schema -> iceberg schema
439            let arrow_fields = self
440                .param
441                .columns
442                .iter()
443                .map(|column| {
444                    Ok(iceberg_create_table_arrow_convert
445                        .to_arrow_field(&column.name, &column.data_type)
446                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
447                        .context(format!(
448                            "failed to convert {}: {} to arrow type",
449                            &column.name, &column.data_type
450                        ))?)
451                })
452                .collect::<Result<Vec<ArrowField>>>()?;
453            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
454            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
455                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
456                .context("failed to convert arrow schema to iceberg schema")?;
457
458            let location = {
459                let mut names = namespace.clone().inner();
460                names.push(self.config.table.table_name().to_owned());
461                match &self.config.common.warehouse_path {
462                    Some(warehouse_path) => {
463                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
464                        let url = Url::parse(warehouse_path);
465                        if url.is_err() || is_s3_tables {
466                            // For rest catalog, the warehouse_path could be a warehouse name.
467                            // In this case, we should specify the location when creating a table.
468                            if self.config.common.catalog_type() == "rest"
469                                || self.config.common.catalog_type() == "rest_rust"
470                            {
471                                None
472                            } else {
473                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
474                            }
475                        } else if warehouse_path.ends_with('/') {
476                            Some(format!("{}{}", warehouse_path, names.join("/")))
477                        } else {
478                            Some(format!("{}/{}", warehouse_path, names.join("/")))
479                        }
480                    }
481                    None => None,
482                }
483            };
484
485            let partition_spec = match &self.config.partition_by {
486                Some(partition_by) => {
487                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
488                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
489                        .into_iter()
490                        .enumerate()
491                    {
492                        match iceberg_schema.field_id_by_name(&column) {
493                            Some(id) => partition_fields.push(
494                                UnboundPartitionField::builder()
495                                    .source_id(id)
496                                    .transform(transform)
497                                    .name(format!("_p_{}", column))
498                                    .field_id(i as i32)
499                                    .build(),
500                            ),
501                            None => bail!(format!(
502                                "Partition source column does not exist in schema: {}",
503                                column
504                            )),
505                        };
506                    }
507                    Some(
508                        UnboundPartitionSpec::builder()
509                            .with_spec_id(0)
510                            .add_partition_fields(partition_fields)
511                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
512                            .context("failed to add partition columns")?
513                            .build(),
514                    )
515                }
516                None => None,
517            };
518
519            let table_creation_builder = TableCreation::builder()
520                .name(self.config.table.table_name().to_owned())
521                .schema(iceberg_schema);
522
523            let table_creation = match (location, partition_spec) {
524                (Some(location), Some(partition_spec)) => table_creation_builder
525                    .location(location)
526                    .partition_spec(partition_spec)
527                    .build(),
528                (Some(location), None) => table_creation_builder.location(location).build(),
529                (None, Some(partition_spec)) => table_creation_builder
530                    .partition_spec(partition_spec)
531                    .build(),
532                (None, None) => table_creation_builder.build(),
533            };
534
535            catalog
536                .create_table(&namespace, table_creation)
537                .await
538                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
539                .context("failed to create iceberg table")?;
540        }
541        Ok(())
542    }
543
544    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
545        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
546            if let Some(pk) = &config.primary_key {
547                let mut unique_column_ids = Vec::with_capacity(pk.len());
548                for col_name in pk {
549                    let id = param
550                        .columns
551                        .iter()
552                        .find(|col| col.name.as_str() == col_name)
553                        .ok_or_else(|| {
554                            SinkError::Config(anyhow!(
555                                "Primary key column {} not found in sink schema",
556                                col_name
557                            ))
558                        })?
559                        .column_id
560                        .get_id() as usize;
561                    unique_column_ids.push(id);
562                }
563                Some(unique_column_ids)
564            } else {
565                unreachable!()
566            }
567        } else {
568            None
569        };
570        Ok(Self {
571            config,
572            param,
573            unique_column_ids,
574        })
575    }
576}
577
578impl Sink for IcebergSink {
579    type Coordinator = IcebergSinkCommitter;
580    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
581
582    const SINK_NAME: &'static str = ICEBERG_SINK;
583
584    async fn validate(&self) -> Result<()> {
585        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
586            bail!("Snowflake catalog only supports iceberg sources");
587        }
588        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
589            risingwave_common::license::Feature::IcebergSinkWithGlue
590                .check_available()
591                .map_err(|e| anyhow::anyhow!(e))?;
592        }
593
594        let _ = self.create_and_validate_table().await?;
595        Ok(())
596    }
597
598    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
599        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
600
601        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
602            if iceberg_config.enable_compaction && compaction_interval == 0 {
603                bail!(
604                    "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
605                );
606            } else {
607                // log compaction_interval
608                tracing::info!(
609                    "Alter config compaction_interval set to {} seconds",
610                    compaction_interval
611                );
612            }
613        }
614
615        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
616            && max_snapshots < 1
617        {
618            bail!(
619                "`max_snapshots_num_before_compaction` must be greater than 0, got: {}",
620                max_snapshots
621            );
622        }
623
624        Ok(())
625    }
626
627    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
628        let table = self.create_and_validate_table().await?;
629        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
630            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
631        } else {
632            IcebergSinkWriter::new_append_only(table, &writer_param).await?
633        };
634
635        let commit_checkpoint_interval =
636            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
637                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
638            );
639        let writer = CoordinatedLogSinker::new(
640            &writer_param,
641            self.param.clone(),
642            inner,
643            commit_checkpoint_interval,
644        )
645        .await?;
646
647        Ok(writer)
648    }
649
650    fn is_coordinated_sink(&self) -> bool {
651        true
652    }
653
654    async fn new_coordinator(
655        &self,
656        db: DatabaseConnection,
657        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
658    ) -> Result<Self::Coordinator> {
659        let catalog = self.config.create_catalog().await?;
660        let table = self.create_and_validate_table().await?;
661        Ok(IcebergSinkCommitter {
662            catalog,
663            table,
664            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
665            last_commit_epoch: 0,
666            sink_id: self.param.sink_id.sink_id(),
667            config: self.config.clone(),
668            param: self.param.clone(),
669            db,
670            commit_retry_num: self.config.commit_retry_num,
671            committed_epoch_subscriber: None,
672            iceberg_compact_stat_sender,
673        })
674    }
675}
676
677/// None means no project.
678/// Prepare represent the extra partition column idx.
679/// Done represents the project idx vec.
680///
681/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
682/// to create the project idx vec.
683enum ProjectIdxVec {
684    None,
685    Prepare(usize),
686    Done(Vec<usize>),
687}
688
689pub struct IcebergSinkWriter {
690    writer: IcebergWriterDispatch,
691    arrow_schema: SchemaRef,
692    // See comments below
693    metrics: IcebergWriterMetrics,
694    // State of iceberg table for this writer
695    table: Table,
696    // For chunk with extra partition column, we should remove this column before write.
697    // This project index vec is used to avoid create project idx each time.
698    project_idx_vec: ProjectIdxVec,
699}
700
701#[allow(clippy::type_complexity)]
702enum IcebergWriterDispatch {
703    PartitionAppendOnly {
704        writer: Option<Box<dyn IcebergWriter>>,
705        writer_builder: MonitoredGeneralWriterBuilder<
706            FanoutPartitionWriterBuilder<
707                DataFileWriterBuilder<
708                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
709                >,
710            >,
711        >,
712    },
713    NonPartitionAppendOnly {
714        writer: Option<Box<dyn IcebergWriter>>,
715        writer_builder: MonitoredGeneralWriterBuilder<
716            DataFileWriterBuilder<
717                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
718            >,
719        >,
720    },
721    PartitionUpsert {
722        writer: Option<Box<dyn IcebergWriter>>,
723        writer_builder: MonitoredGeneralWriterBuilder<
724            FanoutPartitionWriterBuilder<
725                EqualityDeltaWriterBuilder<
726                    DataFileWriterBuilder<
727                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
728                    >,
729                    MonitoredPositionDeleteWriterBuilder<
730                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
731                    >,
732                    EqualityDeleteFileWriterBuilder<
733                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
734                    >,
735                >,
736            >,
737        >,
738        arrow_schema_with_op_column: SchemaRef,
739    },
740    NonpartitionUpsert {
741        writer: Option<Box<dyn IcebergWriter>>,
742        writer_builder: MonitoredGeneralWriterBuilder<
743            EqualityDeltaWriterBuilder<
744                DataFileWriterBuilder<
745                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
746                >,
747                MonitoredPositionDeleteWriterBuilder<
748                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
749                >,
750                EqualityDeleteFileWriterBuilder<
751                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
752                >,
753            >,
754        >,
755        arrow_schema_with_op_column: SchemaRef,
756    },
757}
758
759impl IcebergWriterDispatch {
760    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
761        match self {
762            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
763            | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
764            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
765            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
766        }
767    }
768}
769
770pub struct IcebergWriterMetrics {
771    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
772    // They are actually used in `PrometheusWriterBuilder`:
773    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
774    // We keep them here to let the guard cleans the labels from metrics registry when dropped
775    _write_qps: LabelGuardedIntCounter,
776    _write_latency: LabelGuardedHistogram,
777    write_bytes: LabelGuardedIntCounter,
778}
779
780impl IcebergSinkWriter {
781    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
782        let SinkWriterParam {
783            extra_partition_col_idx,
784            actor_id,
785            sink_id,
786            sink_name,
787            ..
788        } = writer_param;
789        let metrics_labels = [
790            &actor_id.to_string(),
791            &sink_id.to_string(),
792            sink_name.as_str(),
793        ];
794
795        // Metrics
796        let write_qps = GLOBAL_SINK_METRICS
797            .iceberg_write_qps
798            .with_guarded_label_values(&metrics_labels);
799        let write_latency = GLOBAL_SINK_METRICS
800            .iceberg_write_latency
801            .with_guarded_label_values(&metrics_labels);
802        // # TODO
803        // Unused. Add this metrics later.
804        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
805            .iceberg_rolling_unflushed_data_file
806            .with_guarded_label_values(&metrics_labels);
807        let write_bytes = GLOBAL_SINK_METRICS
808            .iceberg_write_bytes
809            .with_guarded_label_values(&metrics_labels);
810
811        let schema = table.metadata().current_schema();
812        let partition_spec = table.metadata().default_partition_spec();
813
814        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
815        let unique_uuid_suffix = Uuid::now_v7();
816
817        let parquet_writer_properties = WriterProperties::builder()
818            .set_max_row_group_size(
819                writer_param
820                    .streaming_config
821                    .developer
822                    .iceberg_sink_write_parquet_max_row_group_rows,
823            )
824            .build();
825
826        let parquet_writer_builder = ParquetWriterBuilder::new(
827            parquet_writer_properties,
828            schema.clone(),
829            table.file_io().clone(),
830            DefaultLocationGenerator::new(table.metadata().clone())
831                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
832            DefaultFileNameGenerator::new(
833                writer_param.actor_id.to_string(),
834                Some(unique_uuid_suffix.to_string()),
835                iceberg::spec::DataFileFormat::Parquet,
836            ),
837        );
838        let data_file_builder =
839            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
840        if partition_spec.fields().is_empty() {
841            let writer_builder = MonitoredGeneralWriterBuilder::new(
842                data_file_builder,
843                write_qps.clone(),
844                write_latency.clone(),
845            );
846            let inner_writer = Some(Box::new(
847                writer_builder
848                    .clone()
849                    .build()
850                    .await
851                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
852            ) as Box<dyn IcebergWriter>);
853            Ok(Self {
854                arrow_schema: Arc::new(
855                    schema_to_arrow_schema(table.metadata().current_schema())
856                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
857                ),
858                metrics: IcebergWriterMetrics {
859                    _write_qps: write_qps,
860                    _write_latency: write_latency,
861                    write_bytes,
862                },
863                writer: IcebergWriterDispatch::NonPartitionAppendOnly {
864                    writer: inner_writer,
865                    writer_builder,
866                },
867                table,
868                project_idx_vec: {
869                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
870                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
871                    } else {
872                        ProjectIdxVec::None
873                    }
874                },
875            })
876        } else {
877            let partition_builder = MonitoredGeneralWriterBuilder::new(
878                FanoutPartitionWriterBuilder::new(
879                    data_file_builder,
880                    partition_spec.clone(),
881                    schema.clone(),
882                )
883                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
884                write_qps.clone(),
885                write_latency.clone(),
886            );
887            let inner_writer = Some(Box::new(
888                partition_builder
889                    .clone()
890                    .build()
891                    .await
892                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
893            ) as Box<dyn IcebergWriter>);
894            Ok(Self {
895                arrow_schema: Arc::new(
896                    schema_to_arrow_schema(table.metadata().current_schema())
897                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
898                ),
899                metrics: IcebergWriterMetrics {
900                    _write_qps: write_qps,
901                    _write_latency: write_latency,
902                    write_bytes,
903                },
904                writer: IcebergWriterDispatch::PartitionAppendOnly {
905                    writer: inner_writer,
906                    writer_builder: partition_builder,
907                },
908                table,
909                project_idx_vec: {
910                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
911                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
912                    } else {
913                        ProjectIdxVec::None
914                    }
915                },
916            })
917        }
918    }
919
920    pub async fn new_upsert(
921        table: Table,
922        unique_column_ids: Vec<usize>,
923        writer_param: &SinkWriterParam,
924    ) -> Result<Self> {
925        let SinkWriterParam {
926            extra_partition_col_idx,
927            actor_id,
928            sink_id,
929            sink_name,
930            ..
931        } = writer_param;
932        let metrics_labels = [
933            &actor_id.to_string(),
934            &sink_id.to_string(),
935            sink_name.as_str(),
936        ];
937        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
938
939        // Metrics
940        let write_qps = GLOBAL_SINK_METRICS
941            .iceberg_write_qps
942            .with_guarded_label_values(&metrics_labels);
943        let write_latency = GLOBAL_SINK_METRICS
944            .iceberg_write_latency
945            .with_guarded_label_values(&metrics_labels);
946        // # TODO
947        // Unused. Add this metrics later.
948        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
949            .iceberg_rolling_unflushed_data_file
950            .with_guarded_label_values(&metrics_labels);
951        let position_delete_cache_num = GLOBAL_SINK_METRICS
952            .iceberg_position_delete_cache_num
953            .with_guarded_label_values(&metrics_labels);
954        let write_bytes = GLOBAL_SINK_METRICS
955            .iceberg_write_bytes
956            .with_guarded_label_values(&metrics_labels);
957
958        // Determine the schema id and partition spec id
959        let schema = table.metadata().current_schema();
960        let partition_spec = table.metadata().default_partition_spec();
961
962        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
963        let unique_uuid_suffix = Uuid::now_v7();
964
965        let parquet_writer_properties = WriterProperties::builder()
966            .set_max_row_group_size(
967                writer_param
968                    .streaming_config
969                    .developer
970                    .iceberg_sink_write_parquet_max_row_group_rows,
971            )
972            .build();
973
974        let data_file_builder = {
975            let parquet_writer_builder = ParquetWriterBuilder::new(
976                parquet_writer_properties.clone(),
977                schema.clone(),
978                table.file_io().clone(),
979                DefaultLocationGenerator::new(table.metadata().clone())
980                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
981                DefaultFileNameGenerator::new(
982                    writer_param.actor_id.to_string(),
983                    Some(unique_uuid_suffix.to_string()),
984                    iceberg::spec::DataFileFormat::Parquet,
985                ),
986            );
987            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
988        };
989        let position_delete_builder = {
990            let parquet_writer_builder = ParquetWriterBuilder::new(
991                parquet_writer_properties.clone(),
992                POSITION_DELETE_SCHEMA.clone(),
993                table.file_io().clone(),
994                DefaultLocationGenerator::new(table.metadata().clone())
995                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
996                DefaultFileNameGenerator::new(
997                    writer_param.actor_id.to_string(),
998                    Some(format!("pos-del-{}", unique_uuid_suffix)),
999                    iceberg::spec::DataFileFormat::Parquet,
1000                ),
1001            );
1002            MonitoredPositionDeleteWriterBuilder::new(
1003                SortPositionDeleteWriterBuilder::new(
1004                    parquet_writer_builder,
1005                    writer_param
1006                        .streaming_config
1007                        .developer
1008                        .iceberg_sink_positional_delete_cache_size,
1009                    None,
1010                    None,
1011                ),
1012                position_delete_cache_num,
1013            )
1014        };
1015        let equality_delete_builder = {
1016            let config = EqualityDeleteWriterConfig::new(
1017                unique_column_ids.clone(),
1018                table.metadata().current_schema().clone(),
1019                None,
1020                partition_spec.spec_id(),
1021            )
1022            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1023            let parquet_writer_builder = ParquetWriterBuilder::new(
1024                parquet_writer_properties.clone(),
1025                Arc::new(
1026                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1027                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1028                ),
1029                table.file_io().clone(),
1030                DefaultLocationGenerator::new(table.metadata().clone())
1031                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1032                DefaultFileNameGenerator::new(
1033                    writer_param.actor_id.to_string(),
1034                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1035                    iceberg::spec::DataFileFormat::Parquet,
1036                ),
1037            );
1038
1039            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1040        };
1041        let delta_builder = EqualityDeltaWriterBuilder::new(
1042            data_file_builder,
1043            position_delete_builder,
1044            equality_delete_builder,
1045            unique_column_ids,
1046        );
1047        if partition_spec.fields().is_empty() {
1048            let writer_builder = MonitoredGeneralWriterBuilder::new(
1049                delta_builder,
1050                write_qps.clone(),
1051                write_latency.clone(),
1052            );
1053            let inner_writer = Some(Box::new(
1054                writer_builder
1055                    .clone()
1056                    .build()
1057                    .await
1058                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1059            ) as Box<dyn IcebergWriter>);
1060            let original_arrow_schema = Arc::new(
1061                schema_to_arrow_schema(table.metadata().current_schema())
1062                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1063            );
1064            let schema_with_extra_op_column = {
1065                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1066                new_fields.push(Arc::new(ArrowField::new(
1067                    "op".to_owned(),
1068                    ArrowDataType::Int32,
1069                    false,
1070                )));
1071                Arc::new(ArrowSchema::new(new_fields))
1072            };
1073            Ok(Self {
1074                arrow_schema: original_arrow_schema,
1075                metrics: IcebergWriterMetrics {
1076                    _write_qps: write_qps,
1077                    _write_latency: write_latency,
1078                    write_bytes,
1079                },
1080                table,
1081                writer: IcebergWriterDispatch::NonpartitionUpsert {
1082                    writer: inner_writer,
1083                    writer_builder,
1084                    arrow_schema_with_op_column: schema_with_extra_op_column,
1085                },
1086                project_idx_vec: {
1087                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1088                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1089                    } else {
1090                        ProjectIdxVec::None
1091                    }
1092                },
1093            })
1094        } else {
1095            let original_arrow_schema = Arc::new(
1096                schema_to_arrow_schema(table.metadata().current_schema())
1097                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1098            );
1099            let schema_with_extra_op_column = {
1100                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1101                new_fields.push(Arc::new(ArrowField::new(
1102                    "op".to_owned(),
1103                    ArrowDataType::Int32,
1104                    false,
1105                )));
1106                Arc::new(ArrowSchema::new(new_fields))
1107            };
1108            let partition_builder = MonitoredGeneralWriterBuilder::new(
1109                FanoutPartitionWriterBuilder::new_with_custom_schema(
1110                    delta_builder,
1111                    schema_with_extra_op_column.clone(),
1112                    partition_spec.clone(),
1113                    table.metadata().current_schema().clone(),
1114                ),
1115                write_qps.clone(),
1116                write_latency.clone(),
1117            );
1118            let inner_writer = Some(Box::new(
1119                partition_builder
1120                    .clone()
1121                    .build()
1122                    .await
1123                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1124            ) as Box<dyn IcebergWriter>);
1125            Ok(Self {
1126                arrow_schema: original_arrow_schema,
1127                metrics: IcebergWriterMetrics {
1128                    _write_qps: write_qps,
1129                    _write_latency: write_latency,
1130                    write_bytes,
1131                },
1132                table,
1133                writer: IcebergWriterDispatch::PartitionUpsert {
1134                    writer: inner_writer,
1135                    writer_builder: partition_builder,
1136                    arrow_schema_with_op_column: schema_with_extra_op_column,
1137                },
1138                project_idx_vec: {
1139                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1140                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1141                    } else {
1142                        ProjectIdxVec::None
1143                    }
1144                },
1145            })
1146        }
1147    }
1148}
1149
1150#[async_trait]
1151impl SinkWriter for IcebergSinkWriter {
1152    type CommitMetadata = Option<SinkMetadata>;
1153
1154    /// Begin a new epoch
1155    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1156        // Just skip it.
1157        Ok(())
1158    }
1159
1160    /// Write a stream chunk to sink
1161    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1162        // Try to build writer if it's None.
1163        match &mut self.writer {
1164            IcebergWriterDispatch::PartitionAppendOnly {
1165                writer,
1166                writer_builder,
1167            } => {
1168                if writer.is_none() {
1169                    *writer = Some(Box::new(
1170                        writer_builder
1171                            .clone()
1172                            .build()
1173                            .await
1174                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1175                    ));
1176                }
1177            }
1178            IcebergWriterDispatch::NonPartitionAppendOnly {
1179                writer,
1180                writer_builder,
1181            } => {
1182                if writer.is_none() {
1183                    *writer = Some(Box::new(
1184                        writer_builder
1185                            .clone()
1186                            .build()
1187                            .await
1188                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1189                    ));
1190                }
1191            }
1192            IcebergWriterDispatch::PartitionUpsert {
1193                writer,
1194                writer_builder,
1195                ..
1196            } => {
1197                if writer.is_none() {
1198                    *writer = Some(Box::new(
1199                        writer_builder
1200                            .clone()
1201                            .build()
1202                            .await
1203                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1204                    ));
1205                }
1206            }
1207            IcebergWriterDispatch::NonpartitionUpsert {
1208                writer,
1209                writer_builder,
1210                ..
1211            } => {
1212                if writer.is_none() {
1213                    *writer = Some(Box::new(
1214                        writer_builder
1215                            .clone()
1216                            .build()
1217                            .await
1218                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1219                    ));
1220                }
1221            }
1222        };
1223
1224        // Process the chunk.
1225        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1226        match &self.project_idx_vec {
1227            ProjectIdxVec::None => {}
1228            ProjectIdxVec::Prepare(idx) => {
1229                if *idx >= chunk.columns().len() {
1230                    return Err(SinkError::Iceberg(anyhow!(
1231                        "invalid extra partition column index {}",
1232                        idx
1233                    )));
1234                }
1235                let project_idx_vec = (0..*idx)
1236                    .chain(*idx + 1..chunk.columns().len())
1237                    .collect_vec();
1238                chunk = chunk.project(&project_idx_vec);
1239                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1240            }
1241            ProjectIdxVec::Done(idx_vec) => {
1242                chunk = chunk.project(idx_vec);
1243            }
1244        }
1245        if ops.is_empty() {
1246            return Ok(());
1247        }
1248        let write_batch_size = chunk.estimated_heap_size();
1249        let batch = match &self.writer {
1250            IcebergWriterDispatch::PartitionAppendOnly { .. }
1251            | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1252                // separate out insert chunk
1253                let filters =
1254                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1255                chunk.set_visibility(filters);
1256                IcebergArrowConvert
1257                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1258                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1259            }
1260            IcebergWriterDispatch::PartitionUpsert {
1261                arrow_schema_with_op_column,
1262                ..
1263            }
1264            | IcebergWriterDispatch::NonpartitionUpsert {
1265                arrow_schema_with_op_column,
1266                ..
1267            } => {
1268                let chunk = IcebergArrowConvert
1269                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1270                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1271                let ops = Arc::new(Int32Array::from(
1272                    ops.iter()
1273                        .map(|op| match op {
1274                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1275                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1276                        })
1277                        .collect_vec(),
1278                ));
1279                let mut columns = chunk.columns().to_vec();
1280                columns.push(ops);
1281                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1282                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1283            }
1284        };
1285
1286        let writer = self.writer.get_writer().unwrap();
1287        writer
1288            .write(batch)
1289            .instrument_await("iceberg_write")
1290            .await
1291            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1292        self.metrics.write_bytes.inc_by(write_batch_size as _);
1293        Ok(())
1294    }
1295
1296    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1297    /// writer should commit the current epoch.
1298    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1299        // Skip it if not checkpoint
1300        if !is_checkpoint {
1301            return Ok(None);
1302        }
1303
1304        let close_result = match &mut self.writer {
1305            IcebergWriterDispatch::PartitionAppendOnly {
1306                writer,
1307                writer_builder,
1308            } => {
1309                let close_result = match writer.take() {
1310                    Some(mut writer) => {
1311                        Some(writer.close().instrument_await("iceberg_close").await)
1312                    }
1313                    _ => None,
1314                };
1315                match writer_builder.clone().build().await {
1316                    Ok(new_writer) => {
1317                        *writer = Some(Box::new(new_writer));
1318                    }
1319                    _ => {
1320                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1321                        // here because current writer may close successfully. So we just log the error.
1322                        warn!("Failed to build new writer after close");
1323                    }
1324                }
1325                close_result
1326            }
1327            IcebergWriterDispatch::NonPartitionAppendOnly {
1328                writer,
1329                writer_builder,
1330            } => {
1331                let close_result = match writer.take() {
1332                    Some(mut writer) => {
1333                        Some(writer.close().instrument_await("iceberg_close").await)
1334                    }
1335                    _ => None,
1336                };
1337                match writer_builder.clone().build().await {
1338                    Ok(new_writer) => {
1339                        *writer = Some(Box::new(new_writer));
1340                    }
1341                    _ => {
1342                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1343                        // here because current writer may close successfully. So we just log the error.
1344                        warn!("Failed to build new writer after close");
1345                    }
1346                }
1347                close_result
1348            }
1349            IcebergWriterDispatch::PartitionUpsert {
1350                writer,
1351                writer_builder,
1352                ..
1353            } => {
1354                let close_result = match writer.take() {
1355                    Some(mut writer) => {
1356                        Some(writer.close().instrument_await("iceberg_close").await)
1357                    }
1358                    _ => None,
1359                };
1360                match writer_builder.clone().build().await {
1361                    Ok(new_writer) => {
1362                        *writer = Some(Box::new(new_writer));
1363                    }
1364                    _ => {
1365                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1366                        // here because current writer may close successfully. So we just log the error.
1367                        warn!("Failed to build new writer after close");
1368                    }
1369                }
1370                close_result
1371            }
1372            IcebergWriterDispatch::NonpartitionUpsert {
1373                writer,
1374                writer_builder,
1375                ..
1376            } => {
1377                let close_result = match writer.take() {
1378                    Some(mut writer) => {
1379                        Some(writer.close().instrument_await("iceberg_close").await)
1380                    }
1381                    _ => None,
1382                };
1383                match writer_builder.clone().build().await {
1384                    Ok(new_writer) => {
1385                        *writer = Some(Box::new(new_writer));
1386                    }
1387                    _ => {
1388                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1389                        // here because current writer may close successfully. So we just log the error.
1390                        warn!("Failed to build new writer after close");
1391                    }
1392                }
1393                close_result
1394            }
1395        };
1396
1397        match close_result {
1398            Some(Ok(result)) => {
1399                let version = self.table.metadata().format_version() as u8;
1400                let partition_type = self.table.metadata().default_partition_type();
1401                let data_files = result
1402                    .into_iter()
1403                    .map(|f| {
1404                        SerializedDataFile::try_from(f, partition_type, version == 1)
1405                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1406                    })
1407                    .collect::<Result<Vec<_>>>()?;
1408                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1409                    data_files,
1410                    schema_id: self.table.metadata().current_schema_id(),
1411                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1412                })?))
1413            }
1414            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1415            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1416        }
1417    }
1418
1419    /// Clean up
1420    async fn abort(&mut self) -> Result<()> {
1421        // TODO: abort should clean up all the data written in this epoch.
1422        Ok(())
1423    }
1424}
1425
1426const SCHEMA_ID: &str = "schema_id";
1427const PARTITION_SPEC_ID: &str = "partition_spec_id";
1428const DATA_FILES: &str = "data_files";
1429
1430#[derive(Default, Clone)]
1431struct IcebergCommitResult {
1432    schema_id: i32,
1433    partition_spec_id: i32,
1434    data_files: Vec<SerializedDataFile>,
1435}
1436
1437impl IcebergCommitResult {
1438    fn try_from(value: &SinkMetadata) -> Result<Self> {
1439        if let Some(Serialized(v)) = &value.metadata {
1440            let mut values = if let serde_json::Value::Object(v) =
1441                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1442                    .context("Can't parse iceberg sink metadata")?
1443            {
1444                v
1445            } else {
1446                bail!("iceberg sink metadata should be an object");
1447            };
1448
1449            let schema_id;
1450            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1451                schema_id = value
1452                    .as_u64()
1453                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1454            } else {
1455                bail!("iceberg sink metadata should have schema_id");
1456            }
1457
1458            let partition_spec_id;
1459            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1460                partition_spec_id = value
1461                    .as_u64()
1462                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1463            } else {
1464                bail!("iceberg sink metadata should have partition_spec_id");
1465            }
1466
1467            let data_files: Vec<SerializedDataFile>;
1468            if let serde_json::Value::Array(values) = values
1469                .remove(DATA_FILES)
1470                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1471            {
1472                data_files = values
1473                    .into_iter()
1474                    .map(from_value::<SerializedDataFile>)
1475                    .collect::<std::result::Result<_, _>>()
1476                    .unwrap();
1477            } else {
1478                bail!("iceberg sink metadata should have data_files object");
1479            }
1480
1481            Ok(Self {
1482                schema_id: schema_id as i32,
1483                partition_spec_id: partition_spec_id as i32,
1484                data_files,
1485            })
1486        } else {
1487            bail!("Can't create iceberg sink write result from empty data!")
1488        }
1489    }
1490
1491    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1492        let mut values = if let serde_json::Value::Object(value) =
1493            serde_json::from_slice::<serde_json::Value>(&value)
1494                .context("Can't parse iceberg sink metadata")?
1495        {
1496            value
1497        } else {
1498            bail!("iceberg sink metadata should be an object");
1499        };
1500
1501        let schema_id;
1502        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1503            schema_id = value
1504                .as_u64()
1505                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1506        } else {
1507            bail!("iceberg sink metadata should have schema_id");
1508        }
1509
1510        let partition_spec_id;
1511        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1512            partition_spec_id = value
1513                .as_u64()
1514                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1515        } else {
1516            bail!("iceberg sink metadata should have partition_spec_id");
1517        }
1518
1519        let data_files: Vec<SerializedDataFile>;
1520        if let serde_json::Value::Array(values) = values
1521            .remove(DATA_FILES)
1522            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1523        {
1524            data_files = values
1525                .into_iter()
1526                .map(from_value::<SerializedDataFile>)
1527                .collect::<std::result::Result<_, _>>()
1528                .unwrap();
1529        } else {
1530            bail!("iceberg sink metadata should have data_files object");
1531        }
1532
1533        Ok(Self {
1534            schema_id: schema_id as i32,
1535            partition_spec_id: partition_spec_id as i32,
1536            data_files,
1537        })
1538    }
1539}
1540
1541impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1542    type Error = SinkError;
1543
1544    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1545        let json_data_files = serde_json::Value::Array(
1546            value
1547                .data_files
1548                .iter()
1549                .map(serde_json::to_value)
1550                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1551                .context("Can't serialize data files to json")?,
1552        );
1553        let json_value = serde_json::Value::Object(
1554            vec![
1555                (
1556                    SCHEMA_ID.to_owned(),
1557                    serde_json::Value::Number(value.schema_id.into()),
1558                ),
1559                (
1560                    PARTITION_SPEC_ID.to_owned(),
1561                    serde_json::Value::Number(value.partition_spec_id.into()),
1562                ),
1563                (DATA_FILES.to_owned(), json_data_files),
1564            ]
1565            .into_iter()
1566            .collect(),
1567        );
1568        Ok(SinkMetadata {
1569            metadata: Some(Serialized(SerializedMetadata {
1570                metadata: serde_json::to_vec(&json_value)
1571                    .context("Can't serialize iceberg sink metadata")?,
1572            })),
1573        })
1574    }
1575}
1576
1577impl TryFrom<IcebergCommitResult> for Vec<u8> {
1578    type Error = SinkError;
1579
1580    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1581        let json_data_files = serde_json::Value::Array(
1582            value
1583                .data_files
1584                .iter()
1585                .map(serde_json::to_value)
1586                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1587                .context("Can't serialize data files to json")?,
1588        );
1589        let json_value = serde_json::Value::Object(
1590            vec![
1591                (
1592                    SCHEMA_ID.to_owned(),
1593                    serde_json::Value::Number(value.schema_id.into()),
1594                ),
1595                (
1596                    PARTITION_SPEC_ID.to_owned(),
1597                    serde_json::Value::Number(value.partition_spec_id.into()),
1598                ),
1599                (DATA_FILES.to_owned(), json_data_files),
1600            ]
1601            .into_iter()
1602            .collect(),
1603        );
1604        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1605    }
1606}
1607pub struct IcebergSinkCommitter {
1608    catalog: Arc<dyn Catalog>,
1609    table: Table,
1610    pub last_commit_epoch: u64,
1611    pub(crate) is_exactly_once: bool,
1612    pub(crate) sink_id: u32,
1613    pub(crate) config: IcebergConfig,
1614    pub(crate) param: SinkParam,
1615    pub(crate) db: DatabaseConnection,
1616    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1617    commit_retry_num: u32,
1618    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1619}
1620
1621impl IcebergSinkCommitter {
1622    // Reload table and guarantee current schema_id and partition_spec_id matches
1623    // given `schema_id` and `partition_spec_id`
1624    async fn reload_table(
1625        catalog: &dyn Catalog,
1626        table_ident: &TableIdent,
1627        schema_id: i32,
1628        partition_spec_id: i32,
1629    ) -> Result<Table> {
1630        let table = catalog
1631            .load_table(table_ident)
1632            .await
1633            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1634        if table.metadata().current_schema_id() != schema_id {
1635            return Err(SinkError::Iceberg(anyhow!(
1636                "Schema evolution not supported, expect schema id {}, but got {}",
1637                schema_id,
1638                table.metadata().current_schema_id()
1639            )));
1640        }
1641        if table.metadata().default_partition_spec_id() != partition_spec_id {
1642            return Err(SinkError::Iceberg(anyhow!(
1643                "Partition evolution not supported, expect partition spec id {}, but got {}",
1644                partition_spec_id,
1645                table.metadata().default_partition_spec_id()
1646            )));
1647        }
1648        Ok(table)
1649    }
1650}
1651
1652#[async_trait::async_trait]
1653impl SinkCommitCoordinator for IcebergSinkCommitter {
1654    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1655        if self.is_exactly_once {
1656            self.committed_epoch_subscriber = Some(subscriber);
1657            tracing::info!(
1658                "Sink id = {}: iceberg sink coordinator initing.",
1659                self.param.sink_id.sink_id()
1660            );
1661            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1662                let ordered_metadata_list_by_end_epoch =
1663                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1664
1665                let mut last_recommit_epoch = 0;
1666                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1667                    ordered_metadata_list_by_end_epoch
1668                {
1669                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1670                    let mut write_results = vec![];
1671
1672                    for each in write_results_bytes {
1673                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1674                        write_results.push(write_result);
1675                    }
1676
1677                    match (
1678                        committed,
1679                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1680                            .await?,
1681                    ) {
1682                        (true, _) => {
1683                            tracing::info!(
1684                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1685                                self.param.sink_id.sink_id()
1686                            );
1687                        }
1688                        (false, true) => {
1689                            // skip
1690                            tracing::info!(
1691                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1692                                self.param.sink_id.sink_id()
1693                            );
1694                            mark_row_is_committed_by_sink_id_and_end_epoch(
1695                                &self.db,
1696                                self.sink_id,
1697                                end_epoch,
1698                            )
1699                            .await?;
1700                        }
1701                        (false, false) => {
1702                            tracing::info!(
1703                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1704                                self.param.sink_id.sink_id()
1705                            );
1706                            self.re_commit(end_epoch, write_results, snapshot_id)
1707                                .await?;
1708                        }
1709                    }
1710
1711                    last_recommit_epoch = end_epoch;
1712                }
1713                tracing::info!(
1714                    "Sink id = {}: iceberg commit coordinator inited.",
1715                    self.param.sink_id.sink_id()
1716                );
1717                return Ok(Some(last_recommit_epoch));
1718            } else {
1719                tracing::info!(
1720                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1721                    self.param.sink_id.sink_id()
1722                );
1723                return Ok(None);
1724            }
1725        }
1726
1727        tracing::info!("Iceberg commit coordinator inited.");
1728        return Ok(None);
1729    }
1730
1731    async fn commit(
1732        &mut self,
1733        epoch: u64,
1734        metadata: Vec<SinkMetadata>,
1735        add_columns: Option<Vec<Field>>,
1736    ) -> Result<()> {
1737        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1738        if let Some(add_columns) = add_columns {
1739            return Err(anyhow!(
1740                "Iceberg sink not support add columns, but got: {:?}",
1741                add_columns
1742            )
1743            .into());
1744        }
1745
1746        let write_results: Vec<IcebergCommitResult> = metadata
1747            .iter()
1748            .map(IcebergCommitResult::try_from)
1749            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1750
1751        // Skip if no data to commit
1752        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1753            tracing::debug!(?epoch, "no data to commit");
1754            return Ok(());
1755        }
1756
1757        // guarantee that all write results has same schema_id and partition_spec_id
1758        if write_results
1759            .iter()
1760            .any(|r| r.schema_id != write_results[0].schema_id)
1761            || write_results
1762                .iter()
1763                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1764        {
1765            return Err(SinkError::Iceberg(anyhow!(
1766                "schema_id and partition_spec_id should be the same in all write results"
1767            )));
1768        }
1769
1770        // Check snapshot limit before proceeding with commit
1771        self.wait_for_snapshot_limit().await?;
1772
1773        if self.is_exactly_once {
1774            assert!(self.committed_epoch_subscriber.is_some());
1775            match self.committed_epoch_subscriber.clone() {
1776                Some(committed_epoch_subscriber) => {
1777                    // Get the latest committed_epoch and the receiver
1778                    let (committed_epoch, mut rw_futures_utilrx) =
1779                        committed_epoch_subscriber(self.param.sink_id).await?;
1780                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1781                    if committed_epoch >= epoch {
1782                        self.commit_iceberg_inner(epoch, write_results, None)
1783                            .await?;
1784                    } else {
1785                        tracing::info!(
1786                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1787                            committed_epoch,
1788                            epoch
1789                        );
1790                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1791                            tracing::info!(
1792                                "Received next committed epoch: {}",
1793                                next_committed_epoch
1794                            );
1795                            // If next_epoch meets the condition, execute commit immediately
1796                            if next_committed_epoch >= epoch {
1797                                self.commit_iceberg_inner(epoch, write_results, None)
1798                                    .await?;
1799                                break;
1800                            }
1801                        }
1802                    }
1803                }
1804                None => unreachable!(
1805                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1806                ),
1807            }
1808        } else {
1809            self.commit_iceberg_inner(epoch, write_results, None)
1810                .await?;
1811        }
1812
1813        Ok(())
1814    }
1815}
1816
1817/// Methods Required to Achieve Exactly Once Semantics
1818impl IcebergSinkCommitter {
1819    async fn re_commit(
1820        &mut self,
1821        epoch: u64,
1822        write_results: Vec<IcebergCommitResult>,
1823        snapshot_id: i64,
1824    ) -> Result<()> {
1825        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1826
1827        // Skip if no data to commit
1828        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1829            tracing::debug!(?epoch, "no data to commit");
1830            return Ok(());
1831        }
1832        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1833            .await?;
1834        Ok(())
1835    }
1836
1837    async fn commit_iceberg_inner(
1838        &mut self,
1839        epoch: u64,
1840        write_results: Vec<IcebergCommitResult>,
1841        snapshot_id: Option<i64>,
1842    ) -> Result<()> {
1843        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1844        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1845        // that was previously persisted in the system table to commit.
1846        let is_first_commit = snapshot_id.is_none();
1847        self.last_commit_epoch = epoch;
1848        let expect_schema_id = write_results[0].schema_id;
1849        let expect_partition_spec_id = write_results[0].partition_spec_id;
1850
1851        // Load the latest table to avoid concurrent modification with the best effort.
1852        self.table = Self::reload_table(
1853            self.catalog.as_ref(),
1854            self.table.identifier(),
1855            expect_schema_id,
1856            expect_partition_spec_id,
1857        )
1858        .await?;
1859        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1860            return Err(SinkError::Iceberg(anyhow!(
1861                "Can't find schema by id {}",
1862                expect_schema_id
1863            )));
1864        };
1865        let Some(partition_spec) = self
1866            .table
1867            .metadata()
1868            .partition_spec_by_id(expect_partition_spec_id)
1869        else {
1870            return Err(SinkError::Iceberg(anyhow!(
1871                "Can't find partition spec by id {}",
1872                expect_partition_spec_id
1873            )));
1874        };
1875        let partition_type = partition_spec
1876            .as_ref()
1877            .clone()
1878            .partition_type(schema)
1879            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1880
1881        let txn = Transaction::new(&self.table);
1882        // Only generate new snapshot id when first commit.
1883        let snapshot_id = match snapshot_id {
1884            Some(previous_snapshot_id) => previous_snapshot_id,
1885            None => txn.generate_unique_snapshot_id(),
1886        };
1887        if self.is_exactly_once && is_first_commit {
1888            // persist pre commit metadata and snapshot id in system table.
1889            let mut pre_commit_metadata_bytes = Vec::new();
1890            for each_parallelism_write_result in write_results.clone() {
1891                let each_parallelism_write_result_bytes: Vec<u8> =
1892                    each_parallelism_write_result.try_into()?;
1893                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1894            }
1895
1896            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1897
1898            persist_pre_commit_metadata(
1899                self.sink_id,
1900                self.db.clone(),
1901                self.last_commit_epoch,
1902                epoch,
1903                pre_commit_metadata_bytes,
1904                snapshot_id,
1905            )
1906            .await?;
1907        }
1908
1909        let data_files = write_results
1910            .into_iter()
1911            .flat_map(|r| {
1912                r.data_files.into_iter().map(|f| {
1913                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1914                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1915                })
1916            })
1917            .collect::<Result<Vec<DataFile>>>()?;
1918        // # TODO:
1919        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1920        // because retry logic involved reapply the commit metadata.
1921        // For now, we just retry the commit operation.
1922        let retry_strategy = ExponentialBackoff::from_millis(10)
1923            .max_delay(Duration::from_secs(60))
1924            .map(jitter)
1925            .take(self.commit_retry_num as usize);
1926        let catalog = self.catalog.clone();
1927        let table_ident = self.table.identifier().clone();
1928        let table = Retry::spawn(retry_strategy, || async {
1929            let table = Self::reload_table(
1930                catalog.as_ref(),
1931                &table_ident,
1932                expect_schema_id,
1933                expect_partition_spec_id,
1934            )
1935            .await?;
1936            let txn = Transaction::new(&table);
1937            let mut append_action = txn
1938                .fast_append(Some(snapshot_id), None, vec![])
1939                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1940                .with_to_branch(commit_branch(
1941                    self.config.r#type.as_str(),
1942                    self.config.write_mode.as_str(),
1943                ));
1944            append_action
1945                .add_data_files(data_files.clone())
1946                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1947            let tx = append_action.apply().await.map_err(|err| {
1948                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1949                SinkError::Iceberg(anyhow!(err))
1950            })?;
1951            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1952                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1953                SinkError::Iceberg(anyhow!(err))
1954            })
1955        })
1956        .await?;
1957        self.table = table;
1958
1959        let snapshot_num = self.table.metadata().snapshots().count();
1960        let catalog_name = self.config.common.catalog_name();
1961        let table_name = self.table.identifier().to_string();
1962        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1963        GLOBAL_SINK_METRICS
1964            .iceberg_snapshot_num
1965            .with_guarded_label_values(&metrics_labels)
1966            .set(snapshot_num as i64);
1967
1968        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1969
1970        if self.is_exactly_once {
1971            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1972            tracing::info!(
1973                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1974                self.sink_id,
1975                epoch
1976            );
1977
1978            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1979        }
1980
1981        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1982            && self.config.enable_compaction
1983            && iceberg_compact_stat_sender
1984                .send(IcebergSinkCompactionUpdate {
1985                    sink_id: SinkId::new(self.sink_id),
1986                    compaction_interval: self.config.compaction_interval_sec(),
1987                    force_compaction: false,
1988                })
1989                .is_err()
1990        {
1991            warn!("failed to send iceberg compaction stats");
1992        }
1993
1994        Ok(())
1995    }
1996
1997    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1998    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1999    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2000    async fn is_snapshot_id_in_iceberg(
2001        &self,
2002        iceberg_config: &IcebergConfig,
2003        snapshot_id: i64,
2004    ) -> Result<bool> {
2005        let table = iceberg_config.load_table().await?;
2006        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2007            Ok(true)
2008        } else {
2009            Ok(false)
2010        }
2011    }
2012
2013    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2014    /// Returns the number of snapshots since the last rewrite/overwrite
2015    fn count_snapshots_since_rewrite(&self) -> usize {
2016        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2017        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2018
2019        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2020        let mut count = 0;
2021        for snapshot in snapshots {
2022            // Check if this snapshot represents a rewrite or overwrite operation
2023            let summary = snapshot.summary();
2024            match &summary.operation {
2025                Operation::Replace => {
2026                    // Found a rewrite/overwrite operation, stop counting
2027                    break;
2028                }
2029
2030                _ => {
2031                    // Increment count for each snapshot that is not a rewrite/overwrite
2032                    count += 1;
2033                }
2034            }
2035        }
2036
2037        count
2038    }
2039
2040    /// Wait until snapshot count since last rewrite is below the limit
2041    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2042        if !self.config.enable_compaction {
2043            return Ok(());
2044        }
2045
2046        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2047            loop {
2048                let current_count = self.count_snapshots_since_rewrite();
2049
2050                if current_count < max_snapshots {
2051                    tracing::info!(
2052                        "Snapshot count check passed: {} < {}",
2053                        current_count,
2054                        max_snapshots
2055                    );
2056                    break;
2057                }
2058
2059                tracing::info!(
2060                    "Snapshot count {} exceeds limit {}, waiting...",
2061                    current_count,
2062                    max_snapshots
2063                );
2064
2065                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2066                    && iceberg_compact_stat_sender
2067                        .send(IcebergSinkCompactionUpdate {
2068                            sink_id: SinkId::new(self.sink_id),
2069                            compaction_interval: self.config.compaction_interval_sec(),
2070                            force_compaction: true,
2071                        })
2072                        .is_err()
2073                {
2074                    tracing::warn!("failed to send iceberg compaction stats");
2075                }
2076
2077                // Wait for 30 seconds before checking again
2078                tokio::time::sleep(Duration::from_secs(30)).await;
2079
2080                // Reload table to get latest snapshots
2081                self.table = self.config.load_table().await?;
2082            }
2083        }
2084        Ok(())
2085    }
2086}
2087
2088const MAP_KEY: &str = "key";
2089const MAP_VALUE: &str = "value";
2090
2091fn get_fields<'a>(
2092    our_field_type: &'a risingwave_common::types::DataType,
2093    data_type: &ArrowDataType,
2094    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2095) -> Option<ArrowFields> {
2096    match data_type {
2097        ArrowDataType::Struct(fields) => {
2098            match our_field_type {
2099                risingwave_common::types::DataType::Struct(struct_fields) => {
2100                    struct_fields.iter().for_each(|(name, data_type)| {
2101                        let res = schema_fields.insert(name, data_type);
2102                        // This assert is to make sure there is no duplicate field name in the schema.
2103                        assert!(res.is_none())
2104                    });
2105                }
2106                risingwave_common::types::DataType::Map(map_fields) => {
2107                    schema_fields.insert(MAP_KEY, map_fields.key());
2108                    schema_fields.insert(MAP_VALUE, map_fields.value());
2109                }
2110                risingwave_common::types::DataType::List(list) => {
2111                    list.elem()
2112                        .as_struct()
2113                        .iter()
2114                        .for_each(|(name, data_type)| {
2115                            let res = schema_fields.insert(name, data_type);
2116                            // This assert is to make sure there is no duplicate field name in the schema.
2117                            assert!(res.is_none())
2118                        });
2119                }
2120                _ => {}
2121            };
2122            Some(fields.clone())
2123        }
2124        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2125            get_fields(our_field_type, field.data_type(), schema_fields)
2126        }
2127        _ => None, // not a supported complex type and unlikely to show up
2128    }
2129}
2130
2131fn check_compatibility(
2132    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2133    fields: &ArrowFields,
2134) -> anyhow::Result<bool> {
2135    for arrow_field in fields {
2136        let our_field_type = schema_fields
2137            .get(arrow_field.name().as_str())
2138            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2139
2140        // Iceberg source should be able to read iceberg decimal type.
2141        let converted_arrow_data_type = IcebergArrowConvert
2142            .to_arrow_field("", our_field_type)
2143            .map_err(|e| anyhow!(e))?
2144            .data_type()
2145            .clone();
2146
2147        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2148            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2149            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2150            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2151            (ArrowDataType::List(_), ArrowDataType::List(field))
2152            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2153                let mut schema_fields = HashMap::new();
2154                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2155                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2156            }
2157            // validate nested structs
2158            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2159                let mut schema_fields = HashMap::new();
2160                our_field_type
2161                    .as_struct()
2162                    .iter()
2163                    .for_each(|(name, data_type)| {
2164                        let res = schema_fields.insert(name, data_type);
2165                        // This assert is to make sure there is no duplicate field name in the schema.
2166                        assert!(res.is_none())
2167                    });
2168                check_compatibility(schema_fields, fields)?
2169            }
2170            // cases where left != right (metadata, field name mismatch)
2171            //
2172            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2173            // {"PARQUET:field_id": ".."}
2174            //
2175            // map: The standard name in arrow is "entries", "key", "value".
2176            // in iceberg-rs, it's called "key_value"
2177            (left, right) => left.equals_datatype(right),
2178        };
2179        if !compatible {
2180            bail!(
2181                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2182                arrow_field.name(),
2183                converted_arrow_data_type,
2184                arrow_field.data_type()
2185            );
2186        }
2187    }
2188    Ok(true)
2189}
2190
2191/// Try to match our schema with iceberg schema.
2192pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2193    if rw_schema.fields.len() != arrow_schema.fields().len() {
2194        bail!(
2195            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2196            rw_schema.fields.len(),
2197            arrow_schema.fields.len()
2198        );
2199    }
2200
2201    let mut schema_fields = HashMap::new();
2202    rw_schema.fields.iter().for_each(|field| {
2203        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2204        // This assert is to make sure there is no duplicate field name in the schema.
2205        assert!(res.is_none())
2206    });
2207
2208    check_compatibility(schema_fields, &arrow_schema.fields)?;
2209    Ok(())
2210}
2211
2212pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2213    serde_json::to_vec(&metadata).unwrap()
2214}
2215
2216pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2217    serde_json::from_slice(&bytes).unwrap()
2218}
2219
2220pub fn parse_partition_by_exprs(
2221    expr: String,
2222) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2223    // captures column, transform(column), transform(n,column), transform(n, column)
2224    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2225    if !re.is_match(&expr) {
2226        bail!(format!(
2227            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2228            expr
2229        ))
2230    }
2231    let caps = re.captures_iter(&expr);
2232
2233    let mut partition_columns = vec![];
2234
2235    for mat in caps {
2236        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2237            (&mat["transform"], Transform::Identity)
2238        } else {
2239            let mut func = mat["transform"].to_owned();
2240            if func == "bucket" || func == "truncate" {
2241                let n = &mat
2242                    .name("n")
2243                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2244                    .as_str();
2245                func = format!("{func}[{n}]");
2246            }
2247            (
2248                &mat["field"],
2249                Transform::from_str(&func)
2250                    .with_context(|| format!("invalid transform function {}", func))?,
2251            )
2252        };
2253        partition_columns.push((column.to_owned(), transform));
2254    }
2255    Ok(partition_columns)
2256}
2257
2258pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2259    if should_enable_iceberg_cow(sink_type, write_mode) {
2260        ICEBERG_COW_BRANCH.to_owned()
2261    } else {
2262        MAIN_BRANCH.to_owned()
2263    }
2264}
2265
2266pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2267    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2268}
2269
2270#[cfg(test)]
2271mod test {
2272    use std::collections::BTreeMap;
2273
2274    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2275    use risingwave_common::types::{DataType, MapType, StructType};
2276
2277    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2278    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2279    use crate::sink::iceberg::{
2280        COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2281        ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2282        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2283        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2284    };
2285
2286    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2287
2288    #[test]
2289    fn test_compatible_arrow_schema() {
2290        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2291
2292        use super::*;
2293        let risingwave_schema = Schema::new(vec![
2294            Field::with_name(DataType::Int32, "a"),
2295            Field::with_name(DataType::Int32, "b"),
2296            Field::with_name(DataType::Int32, "c"),
2297        ]);
2298        let arrow_schema = ArrowSchema::new(vec![
2299            ArrowField::new("a", ArrowDataType::Int32, false),
2300            ArrowField::new("b", ArrowDataType::Int32, false),
2301            ArrowField::new("c", ArrowDataType::Int32, false),
2302        ]);
2303
2304        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2305
2306        let risingwave_schema = Schema::new(vec![
2307            Field::with_name(DataType::Int32, "d"),
2308            Field::with_name(DataType::Int32, "c"),
2309            Field::with_name(DataType::Int32, "a"),
2310            Field::with_name(DataType::Int32, "b"),
2311        ]);
2312        let arrow_schema = ArrowSchema::new(vec![
2313            ArrowField::new("a", ArrowDataType::Int32, false),
2314            ArrowField::new("b", ArrowDataType::Int32, false),
2315            ArrowField::new("d", ArrowDataType::Int32, false),
2316            ArrowField::new("c", ArrowDataType::Int32, false),
2317        ]);
2318        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2319
2320        let risingwave_schema = Schema::new(vec![
2321            Field::with_name(
2322                DataType::Struct(StructType::new(vec![
2323                    ("a1", DataType::Int32),
2324                    (
2325                        "a2",
2326                        DataType::Struct(StructType::new(vec![
2327                            ("a21", DataType::Bytea),
2328                            (
2329                                "a22",
2330                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2331                            ),
2332                        ])),
2333                    ),
2334                ])),
2335                "a",
2336            ),
2337            Field::with_name(
2338                DataType::list(DataType::Struct(StructType::new(vec![
2339                    ("b1", DataType::Int32),
2340                    ("b2", DataType::Bytea),
2341                    (
2342                        "b3",
2343                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2344                    ),
2345                ]))),
2346                "b",
2347            ),
2348            Field::with_name(
2349                DataType::Map(MapType::from_kv(
2350                    DataType::Varchar,
2351                    DataType::list(DataType::Struct(StructType::new([
2352                        ("c1", DataType::Int32),
2353                        ("c2", DataType::Bytea),
2354                        (
2355                            "c3",
2356                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2357                        ),
2358                    ]))),
2359                )),
2360                "c",
2361            ),
2362        ]);
2363        let arrow_schema = ArrowSchema::new(vec![
2364            ArrowField::new(
2365                "a",
2366                ArrowDataType::Struct(ArrowFields::from(vec![
2367                    ArrowField::new("a1", ArrowDataType::Int32, false),
2368                    ArrowField::new(
2369                        "a2",
2370                        ArrowDataType::Struct(ArrowFields::from(vec![
2371                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2372                            ArrowField::new_map(
2373                                "a22",
2374                                "entries",
2375                                ArrowFieldRef::new(ArrowField::new(
2376                                    "key",
2377                                    ArrowDataType::Utf8,
2378                                    false,
2379                                )),
2380                                ArrowFieldRef::new(ArrowField::new(
2381                                    "value",
2382                                    ArrowDataType::Utf8,
2383                                    false,
2384                                )),
2385                                false,
2386                                false,
2387                            ),
2388                        ])),
2389                        false,
2390                    ),
2391                ])),
2392                false,
2393            ),
2394            ArrowField::new(
2395                "b",
2396                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2397                    ArrowDataType::Struct(ArrowFields::from(vec![
2398                        ArrowField::new("b1", ArrowDataType::Int32, false),
2399                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2400                        ArrowField::new_map(
2401                            "b3",
2402                            "entries",
2403                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2404                            ArrowFieldRef::new(ArrowField::new(
2405                                "value",
2406                                ArrowDataType::Utf8,
2407                                false,
2408                            )),
2409                            false,
2410                            false,
2411                        ),
2412                    ])),
2413                    false,
2414                ))),
2415                false,
2416            ),
2417            ArrowField::new_map(
2418                "c",
2419                "entries",
2420                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2421                ArrowFieldRef::new(ArrowField::new(
2422                    "value",
2423                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2424                        ArrowDataType::Struct(ArrowFields::from(vec![
2425                            ArrowField::new("c1", ArrowDataType::Int32, false),
2426                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2427                            ArrowField::new_map(
2428                                "c3",
2429                                "entries",
2430                                ArrowFieldRef::new(ArrowField::new(
2431                                    "key",
2432                                    ArrowDataType::Utf8,
2433                                    false,
2434                                )),
2435                                ArrowFieldRef::new(ArrowField::new(
2436                                    "value",
2437                                    ArrowDataType::Utf8,
2438                                    false,
2439                                )),
2440                                false,
2441                                false,
2442                            ),
2443                        ])),
2444                        false,
2445                    ))),
2446                    false,
2447                )),
2448                false,
2449                false,
2450            ),
2451        ]);
2452        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2453    }
2454
2455    #[test]
2456    fn test_parse_iceberg_config() {
2457        let values = [
2458            ("connector", "iceberg"),
2459            ("type", "upsert"),
2460            ("primary_key", "v1"),
2461            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2462            ("warehouse.path", "s3://iceberg"),
2463            ("s3.endpoint", "http://127.0.0.1:9301"),
2464            ("s3.access.key", "hummockadmin"),
2465            ("s3.secret.key", "hummockadmin"),
2466            ("s3.path.style.access", "true"),
2467            ("s3.region", "us-east-1"),
2468            ("catalog.type", "jdbc"),
2469            ("catalog.name", "demo"),
2470            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2471            ("catalog.jdbc.user", "admin"),
2472            ("catalog.jdbc.password", "123456"),
2473            ("database.name", "demo_db"),
2474            ("table.name", "demo_table"),
2475            ("enable_compaction", "true"),
2476            ("compaction_interval_sec", "1800"),
2477            ("enable_snapshot_expiration", "true"),
2478        ]
2479        .into_iter()
2480        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2481        .collect();
2482
2483        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2484
2485        let expected_iceberg_config = IcebergConfig {
2486            common: IcebergCommon {
2487                warehouse_path: Some("s3://iceberg".to_owned()),
2488                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2489                region: Some("us-east-1".to_owned()),
2490                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2491                access_key: Some("hummockadmin".to_owned()),
2492                secret_key: Some("hummockadmin".to_owned()),
2493                gcs_credential: None,
2494                catalog_type: Some("jdbc".to_owned()),
2495                glue_id: None,
2496                catalog_name: Some("demo".to_owned()),
2497                path_style_access: Some(true),
2498                credential: None,
2499                oauth2_server_uri: None,
2500                scope: None,
2501                token: None,
2502                enable_config_load: None,
2503                rest_signing_name: None,
2504                rest_signing_region: None,
2505                rest_sigv4_enabled: None,
2506                hosted_catalog: None,
2507                azblob_account_name: None,
2508                azblob_account_key: None,
2509                azblob_endpoint_url: None,
2510                header: None,
2511                adlsgen2_account_name: None,
2512                adlsgen2_account_key: None,
2513                adlsgen2_endpoint: None,
2514                vended_credentials: None,
2515            },
2516            table: IcebergTableIdentifier {
2517                database_name: Some("demo_db".to_owned()),
2518                table_name: "demo_table".to_owned(),
2519            },
2520            r#type: "upsert".to_owned(),
2521            force_append_only: false,
2522            primary_key: Some(vec!["v1".to_owned()]),
2523            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2524            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2525                .into_iter()
2526                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2527                .collect(),
2528            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2529            create_table_if_not_exists: false,
2530            is_exactly_once: Some(true),
2531            commit_retry_num: 8,
2532            enable_compaction: true,
2533            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2534            enable_snapshot_expiration: true,
2535            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2536            snapshot_expiration_max_age_millis: None,
2537            snapshot_expiration_retain_last: None,
2538            snapshot_expiration_clear_expired_files: true,
2539            snapshot_expiration_clear_expired_meta_data: true,
2540            max_snapshots_num_before_compaction: None,
2541        };
2542
2543        assert_eq!(iceberg_config, expected_iceberg_config);
2544
2545        assert_eq!(
2546            &iceberg_config.full_table_name().unwrap().to_string(),
2547            "demo_db.demo_table"
2548        );
2549    }
2550
2551    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2552        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2553
2554        let table = iceberg_config.load_table().await.unwrap();
2555
2556        println!("{:?}", table.identifier());
2557    }
2558
2559    #[tokio::test]
2560    #[ignore]
2561    async fn test_storage_catalog() {
2562        let values = [
2563            ("connector", "iceberg"),
2564            ("type", "append-only"),
2565            ("force_append_only", "true"),
2566            ("s3.endpoint", "http://127.0.0.1:9301"),
2567            ("s3.access.key", "hummockadmin"),
2568            ("s3.secret.key", "hummockadmin"),
2569            ("s3.region", "us-east-1"),
2570            ("s3.path.style.access", "true"),
2571            ("catalog.name", "demo"),
2572            ("catalog.type", "storage"),
2573            ("warehouse.path", "s3://icebergdata/demo"),
2574            ("database.name", "s1"),
2575            ("table.name", "t1"),
2576        ]
2577        .into_iter()
2578        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2579        .collect();
2580
2581        test_create_catalog(values).await;
2582    }
2583
2584    #[tokio::test]
2585    #[ignore]
2586    async fn test_rest_catalog() {
2587        let values = [
2588            ("connector", "iceberg"),
2589            ("type", "append-only"),
2590            ("force_append_only", "true"),
2591            ("s3.endpoint", "http://127.0.0.1:9301"),
2592            ("s3.access.key", "hummockadmin"),
2593            ("s3.secret.key", "hummockadmin"),
2594            ("s3.region", "us-east-1"),
2595            ("s3.path.style.access", "true"),
2596            ("catalog.name", "demo"),
2597            ("catalog.type", "rest"),
2598            ("catalog.uri", "http://192.168.167.4:8181"),
2599            ("warehouse.path", "s3://icebergdata/demo"),
2600            ("database.name", "s1"),
2601            ("table.name", "t1"),
2602        ]
2603        .into_iter()
2604        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2605        .collect();
2606
2607        test_create_catalog(values).await;
2608    }
2609
2610    #[tokio::test]
2611    #[ignore]
2612    async fn test_jdbc_catalog() {
2613        let values = [
2614            ("connector", "iceberg"),
2615            ("type", "append-only"),
2616            ("force_append_only", "true"),
2617            ("s3.endpoint", "http://127.0.0.1:9301"),
2618            ("s3.access.key", "hummockadmin"),
2619            ("s3.secret.key", "hummockadmin"),
2620            ("s3.region", "us-east-1"),
2621            ("s3.path.style.access", "true"),
2622            ("catalog.name", "demo"),
2623            ("catalog.type", "jdbc"),
2624            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2625            ("catalog.jdbc.user", "admin"),
2626            ("catalog.jdbc.password", "123456"),
2627            ("warehouse.path", "s3://icebergdata/demo"),
2628            ("database.name", "s1"),
2629            ("table.name", "t1"),
2630        ]
2631        .into_iter()
2632        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2633        .collect();
2634
2635        test_create_catalog(values).await;
2636    }
2637
2638    #[tokio::test]
2639    #[ignore]
2640    async fn test_hive_catalog() {
2641        let values = [
2642            ("connector", "iceberg"),
2643            ("type", "append-only"),
2644            ("force_append_only", "true"),
2645            ("s3.endpoint", "http://127.0.0.1:9301"),
2646            ("s3.access.key", "hummockadmin"),
2647            ("s3.secret.key", "hummockadmin"),
2648            ("s3.region", "us-east-1"),
2649            ("s3.path.style.access", "true"),
2650            ("catalog.name", "demo"),
2651            ("catalog.type", "hive"),
2652            ("catalog.uri", "thrift://localhost:9083"),
2653            ("warehouse.path", "s3://icebergdata/demo"),
2654            ("database.name", "s1"),
2655            ("table.name", "t1"),
2656        ]
2657        .into_iter()
2658        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2659        .collect();
2660
2661        test_create_catalog(values).await;
2662    }
2663
2664    #[test]
2665    fn test_config_constants_consistency() {
2666        // This test ensures our constants match the expected configuration names
2667        // If you change a constant, this test will remind you to update both places
2668        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2669        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2670        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2671        assert_eq!(WRITE_MODE, "write_mode");
2672        assert_eq!(
2673            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2674            "snapshot_expiration_retain_last"
2675        );
2676        assert_eq!(
2677            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2678            "snapshot_expiration_max_age_millis"
2679        );
2680        assert_eq!(
2681            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2682            "snapshot_expiration_clear_expired_files"
2683        );
2684        assert_eq!(
2685            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2686            "snapshot_expiration_clear_expired_meta_data"
2687        );
2688        assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2689    }
2690}