risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103// Configuration constants
104pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112    "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn default_commit_retry_num() -> u32 {
116    8
117}
118
119fn default_iceberg_write_mode() -> String {
120    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
121}
122
123fn default_true() -> bool {
124    true
125}
126
127#[serde_as]
128#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
129pub struct IcebergConfig {
130    pub r#type: String, // accept "append-only" or "upsert"
131
132    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
133    pub force_append_only: bool,
134
135    #[serde(flatten)]
136    common: IcebergCommon,
137
138    #[serde(
139        rename = "primary_key",
140        default,
141        deserialize_with = "deserialize_optional_string_seq_from_string"
142    )]
143    pub primary_key: Option<Vec<String>>,
144
145    // Props for java catalog props.
146    #[serde(skip)]
147    pub java_catalog_props: HashMap<String, String>,
148
149    #[serde(default)]
150    pub partition_by: Option<String>,
151
152    /// Commit every n(>0) checkpoints, default is 10.
153    #[serde(default = "default_commit_checkpoint_interval")]
154    #[serde_as(as = "DisplayFromStr")]
155    #[with_option(allow_alter_on_fly)]
156    pub commit_checkpoint_interval: u64,
157
158    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
159    pub create_table_if_not_exists: bool,
160
161    /// Whether it is `exactly_once`, the default is not.
162    #[serde(default)]
163    #[serde_as(as = "Option<DisplayFromStr>")]
164    pub is_exactly_once: Option<bool>,
165    // Retry commit num when iceberg commit fail. default is 8.
166    // # TODO
167    // Iceberg table may store the retry commit num in table meta.
168    // We should try to find and use that as default commit retry num first.
169    #[serde(default = "default_commit_retry_num")]
170    pub commit_retry_num: u32,
171
172    /// Whether to enable iceberg compaction.
173    #[serde(
174        rename = "enable_compaction",
175        default,
176        deserialize_with = "deserialize_bool_from_string"
177    )]
178    #[with_option(allow_alter_on_fly)]
179    pub enable_compaction: bool,
180
181    /// The interval of iceberg compaction
182    #[serde(rename = "compaction_interval_sec", default)]
183    #[serde_as(as = "Option<DisplayFromStr>")]
184    #[with_option(allow_alter_on_fly)]
185    pub compaction_interval_sec: Option<u64>,
186
187    /// Whether to enable iceberg expired snapshots.
188    #[serde(
189        rename = "enable_snapshot_expiration",
190        default,
191        deserialize_with = "deserialize_bool_from_string"
192    )]
193    #[with_option(allow_alter_on_fly)]
194    pub enable_snapshot_expiration: bool,
195
196    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
197    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
198    pub write_mode: String,
199
200    /// The maximum age (in milliseconds) for snapshots before they expire
201    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
202    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
203    #[serde_as(as = "Option<DisplayFromStr>")]
204    #[with_option(allow_alter_on_fly)]
205    pub snapshot_expiration_max_age_millis: Option<i64>,
206
207    /// The number of snapshots to retain
208    #[serde(rename = "snapshot_expiration_retain_last", default)]
209    #[serde_as(as = "Option<DisplayFromStr>")]
210    #[with_option(allow_alter_on_fly)]
211    pub snapshot_expiration_retain_last: Option<i32>,
212
213    #[serde(
214        rename = "snapshot_expiration_clear_expired_files",
215        default = "default_true",
216        deserialize_with = "deserialize_bool_from_string"
217    )]
218    #[with_option(allow_alter_on_fly)]
219    pub snapshot_expiration_clear_expired_files: bool,
220
221    #[serde(
222        rename = "snapshot_expiration_clear_expired_meta_data",
223        default = "default_true",
224        deserialize_with = "deserialize_bool_from_string"
225    )]
226    #[with_option(allow_alter_on_fly)]
227    pub snapshot_expiration_clear_expired_meta_data: bool,
228
229    /// The maximum number of snapshots allowed since the last rewrite operation
230    /// If set, sink will check snapshot count and wait if exceeded
231    #[serde(rename = "max_snapshots_num_before_compaction", default)]
232    #[serde_as(as = "Option<DisplayFromStr>")]
233    #[with_option(allow_alter_on_fly)]
234    pub max_snapshots_num_before_compaction: Option<usize>,
235}
236
237impl EnforceSecret for IcebergConfig {
238    fn enforce_secret<'a>(
239        prop_iter: impl Iterator<Item = &'a str>,
240    ) -> crate::error::ConnectorResult<()> {
241        for prop in prop_iter {
242            IcebergCommon::enforce_one(prop)?;
243        }
244        Ok(())
245    }
246
247    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
248        IcebergCommon::enforce_one(prop)
249    }
250}
251
252impl IcebergConfig {
253    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
254        let mut config =
255            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
256                .map_err(|e| SinkError::Config(anyhow!(e)))?;
257
258        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
259            return Err(SinkError::Config(anyhow!(
260                "`{}` must be {}, or {}",
261                SINK_TYPE_OPTION,
262                SINK_TYPE_APPEND_ONLY,
263                SINK_TYPE_UPSERT
264            )));
265        }
266
267        if config.r#type == SINK_TYPE_UPSERT {
268            if let Some(primary_key) = &config.primary_key {
269                if primary_key.is_empty() {
270                    return Err(SinkError::Config(anyhow!(
271                        "`primary_key` must not be empty in {}",
272                        SINK_TYPE_UPSERT
273                    )));
274                }
275            } else {
276                return Err(SinkError::Config(anyhow!(
277                    "Must set `primary_key` in {}",
278                    SINK_TYPE_UPSERT
279                )));
280            }
281        }
282
283        // All configs start with "catalog." will be treated as java configs.
284        config.java_catalog_props = values
285            .iter()
286            .filter(|(k, _v)| {
287                k.starts_with("catalog.")
288                    && k != &"catalog.uri"
289                    && k != &"catalog.type"
290                    && k != &"catalog.name"
291                    && k != &"catalog.header"
292            })
293            .map(|(k, v)| (k[8..].to_string(), v.clone()))
294            .collect();
295
296        if config.commit_checkpoint_interval == 0 {
297            return Err(SinkError::Config(anyhow!(
298                "`commit_checkpoint_interval` must be greater than 0"
299            )));
300        }
301
302        Ok(config)
303    }
304
305    pub fn catalog_type(&self) -> &str {
306        self.common.catalog_type()
307    }
308
309    pub async fn load_table(&self) -> Result<Table> {
310        self.common
311            .load_table(&self.java_catalog_props)
312            .await
313            .map_err(Into::into)
314    }
315
316    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
317        self.common
318            .create_catalog(&self.java_catalog_props)
319            .await
320            .map_err(Into::into)
321    }
322
323    pub fn full_table_name(&self) -> Result<TableIdent> {
324        self.common.full_table_name().map_err(Into::into)
325    }
326
327    pub fn catalog_name(&self) -> String {
328        self.common.catalog_name()
329    }
330
331    pub fn compaction_interval_sec(&self) -> u64 {
332        // default to 1 hour
333        self.compaction_interval_sec.unwrap_or(3600)
334    }
335
336    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
337    /// Returns `current_time_ms` - `max_age_millis`
338    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
339        self.snapshot_expiration_max_age_millis
340            .map(|max_age_millis| current_time_ms - max_age_millis)
341    }
342}
343
344pub struct IcebergSink {
345    config: IcebergConfig,
346    param: SinkParam,
347    // In upsert mode, it never be None and empty.
348    unique_column_ids: Option<Vec<usize>>,
349}
350
351impl EnforceSecret for IcebergSink {
352    fn enforce_secret<'a>(
353        prop_iter: impl Iterator<Item = &'a str>,
354    ) -> crate::error::ConnectorResult<()> {
355        for prop in prop_iter {
356            IcebergConfig::enforce_one(prop)?;
357        }
358        Ok(())
359    }
360}
361
362impl TryFrom<SinkParam> for IcebergSink {
363    type Error = SinkError;
364
365    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
366        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
367        IcebergSink::new(config, param)
368    }
369}
370
371impl Debug for IcebergSink {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        f.debug_struct("IcebergSink")
374            .field("config", &self.config)
375            .finish()
376    }
377}
378
379impl IcebergSink {
380    async fn create_and_validate_table(&self) -> Result<Table> {
381        if self.config.create_table_if_not_exists {
382            self.create_table_if_not_exists().await?;
383        }
384
385        let table = self
386            .config
387            .load_table()
388            .await
389            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
390
391        let sink_schema = self.param.schema();
392        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
393            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
394
395        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
396            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
397
398        Ok(table)
399    }
400
401    async fn create_table_if_not_exists(&self) -> Result<()> {
402        let catalog = self.config.create_catalog().await?;
403        let namespace = if let Some(database_name) = &self.config.common.database_name {
404            let namespace = NamespaceIdent::new(database_name.clone());
405            if !catalog
406                .namespace_exists(&namespace)
407                .await
408                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
409            {
410                catalog
411                    .create_namespace(&namespace, HashMap::default())
412                    .await
413                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
414                    .context("failed to create iceberg namespace")?;
415            }
416            namespace
417        } else {
418            bail!("database name must be set if you want to create table")
419        };
420
421        let table_id = self
422            .config
423            .full_table_name()
424            .context("Unable to parse table name")?;
425        if !catalog
426            .table_exists(&table_id)
427            .await
428            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
429        {
430            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
431            // convert risingwave schema -> arrow schema -> iceberg schema
432            let arrow_fields = self
433                .param
434                .columns
435                .iter()
436                .map(|column| {
437                    Ok(iceberg_create_table_arrow_convert
438                        .to_arrow_field(&column.name, &column.data_type)
439                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
440                        .context(format!(
441                            "failed to convert {}: {} to arrow type",
442                            &column.name, &column.data_type
443                        ))?)
444                })
445                .collect::<Result<Vec<ArrowField>>>()?;
446            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
447            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
448                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
449                .context("failed to convert arrow schema to iceberg schema")?;
450
451            let location = {
452                let mut names = namespace.clone().inner();
453                names.push(self.config.common.table_name.clone());
454                match &self.config.common.warehouse_path {
455                    Some(warehouse_path) => {
456                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
457                        let url = Url::parse(warehouse_path);
458                        if url.is_err() || is_s3_tables {
459                            // For rest catalog, the warehouse_path could be a warehouse name.
460                            // In this case, we should specify the location when creating a table.
461                            if self.config.common.catalog_type() == "rest"
462                                || self.config.common.catalog_type() == "rest_rust"
463                            {
464                                None
465                            } else {
466                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
467                            }
468                        } else if warehouse_path.ends_with('/') {
469                            Some(format!("{}{}", warehouse_path, names.join("/")))
470                        } else {
471                            Some(format!("{}/{}", warehouse_path, names.join("/")))
472                        }
473                    }
474                    None => None,
475                }
476            };
477
478            let partition_spec = match &self.config.partition_by {
479                Some(partition_by) => {
480                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
481                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
482                        .into_iter()
483                        .enumerate()
484                    {
485                        match iceberg_schema.field_id_by_name(&column) {
486                            Some(id) => partition_fields.push(
487                                UnboundPartitionField::builder()
488                                    .source_id(id)
489                                    .transform(transform)
490                                    .name(format!("_p_{}", column))
491                                    .field_id(i as i32)
492                                    .build(),
493                            ),
494                            None => bail!(format!(
495                                "Partition source column does not exist in schema: {}",
496                                column
497                            )),
498                        };
499                    }
500                    Some(
501                        UnboundPartitionSpec::builder()
502                            .with_spec_id(0)
503                            .add_partition_fields(partition_fields)
504                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
505                            .context("failed to add partition columns")?
506                            .build(),
507                    )
508                }
509                None => None,
510            };
511
512            let table_creation_builder = TableCreation::builder()
513                .name(self.config.common.table_name.clone())
514                .schema(iceberg_schema);
515
516            let table_creation = match (location, partition_spec) {
517                (Some(location), Some(partition_spec)) => table_creation_builder
518                    .location(location)
519                    .partition_spec(partition_spec)
520                    .build(),
521                (Some(location), None) => table_creation_builder.location(location).build(),
522                (None, Some(partition_spec)) => table_creation_builder
523                    .partition_spec(partition_spec)
524                    .build(),
525                (None, None) => table_creation_builder.build(),
526            };
527
528            catalog
529                .create_table(&namespace, table_creation)
530                .await
531                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
532                .context("failed to create iceberg table")?;
533        }
534        Ok(())
535    }
536
537    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
538        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
539            if let Some(pk) = &config.primary_key {
540                let mut unique_column_ids = Vec::with_capacity(pk.len());
541                for col_name in pk {
542                    let id = param
543                        .columns
544                        .iter()
545                        .find(|col| col.name.as_str() == col_name)
546                        .ok_or_else(|| {
547                            SinkError::Config(anyhow!(
548                                "Primary key column {} not found in sink schema",
549                                col_name
550                            ))
551                        })?
552                        .column_id
553                        .get_id() as usize;
554                    unique_column_ids.push(id);
555                }
556                Some(unique_column_ids)
557            } else {
558                unreachable!()
559            }
560        } else {
561            None
562        };
563        Ok(Self {
564            config,
565            param,
566            unique_column_ids,
567        })
568    }
569}
570
571impl Sink for IcebergSink {
572    type Coordinator = IcebergSinkCommitter;
573    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
574
575    const SINK_NAME: &'static str = ICEBERG_SINK;
576
577    async fn validate(&self) -> Result<()> {
578        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
579            bail!("Snowflake catalog only supports iceberg sources");
580        }
581        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
582            risingwave_common::license::Feature::IcebergSinkWithGlue
583                .check_available()
584                .map_err(|e| anyhow::anyhow!(e))?;
585        }
586
587        let _ = self.create_and_validate_table().await?;
588        Ok(())
589    }
590
591    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
592        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
593
594        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
595            if iceberg_config.enable_compaction && compaction_interval == 0 {
596                bail!(
597                    "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
598                );
599            } else {
600                // log compaction_interval
601                tracing::info!(
602                    "Alter config compaction_interval set to {} seconds",
603                    compaction_interval
604                );
605            }
606        }
607
608        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
609            && max_snapshots < 1
610        {
611            bail!(
612                "`max_snapshots_num_before_compaction` must be greater than 0, got: {}",
613                max_snapshots
614            );
615        }
616
617        Ok(())
618    }
619
620    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
621        let table = self.create_and_validate_table().await?;
622        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
623            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
624        } else {
625            IcebergSinkWriter::new_append_only(table, &writer_param).await?
626        };
627
628        let commit_checkpoint_interval =
629            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
630                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
631            );
632        let writer = CoordinatedLogSinker::new(
633            &writer_param,
634            self.param.clone(),
635            inner,
636            commit_checkpoint_interval,
637        )
638        .await?;
639
640        Ok(writer)
641    }
642
643    fn is_coordinated_sink(&self) -> bool {
644        true
645    }
646
647    async fn new_coordinator(
648        &self,
649        db: DatabaseConnection,
650        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
651    ) -> Result<Self::Coordinator> {
652        let catalog = self.config.create_catalog().await?;
653        let table = self.create_and_validate_table().await?;
654        Ok(IcebergSinkCommitter {
655            catalog,
656            table,
657            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
658            last_commit_epoch: 0,
659            sink_id: self.param.sink_id.sink_id(),
660            config: self.config.clone(),
661            param: self.param.clone(),
662            db,
663            commit_retry_num: self.config.commit_retry_num,
664            committed_epoch_subscriber: None,
665            iceberg_compact_stat_sender,
666        })
667    }
668}
669
670/// None means no project.
671/// Prepare represent the extra partition column idx.
672/// Done represents the project idx vec.
673///
674/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
675/// to create the project idx vec.
676enum ProjectIdxVec {
677    None,
678    Prepare(usize),
679    Done(Vec<usize>),
680}
681
682pub struct IcebergSinkWriter {
683    writer: IcebergWriterDispatch,
684    arrow_schema: SchemaRef,
685    // See comments below
686    metrics: IcebergWriterMetrics,
687    // State of iceberg table for this writer
688    table: Table,
689    // For chunk with extra partition column, we should remove this column before write.
690    // This project index vec is used to avoid create project idx each time.
691    project_idx_vec: ProjectIdxVec,
692}
693
694#[allow(clippy::type_complexity)]
695enum IcebergWriterDispatch {
696    PartitionAppendOnly {
697        writer: Option<Box<dyn IcebergWriter>>,
698        writer_builder: MonitoredGeneralWriterBuilder<
699            FanoutPartitionWriterBuilder<
700                DataFileWriterBuilder<
701                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
702                >,
703            >,
704        >,
705    },
706    NonPartitionAppendOnly {
707        writer: Option<Box<dyn IcebergWriter>>,
708        writer_builder: MonitoredGeneralWriterBuilder<
709            DataFileWriterBuilder<
710                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
711            >,
712        >,
713    },
714    PartitionUpsert {
715        writer: Option<Box<dyn IcebergWriter>>,
716        writer_builder: MonitoredGeneralWriterBuilder<
717            FanoutPartitionWriterBuilder<
718                EqualityDeltaWriterBuilder<
719                    DataFileWriterBuilder<
720                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
721                    >,
722                    MonitoredPositionDeleteWriterBuilder<
723                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
724                    >,
725                    EqualityDeleteFileWriterBuilder<
726                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
727                    >,
728                >,
729            >,
730        >,
731        arrow_schema_with_op_column: SchemaRef,
732    },
733    NonpartitionUpsert {
734        writer: Option<Box<dyn IcebergWriter>>,
735        writer_builder: MonitoredGeneralWriterBuilder<
736            EqualityDeltaWriterBuilder<
737                DataFileWriterBuilder<
738                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
739                >,
740                MonitoredPositionDeleteWriterBuilder<
741                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
742                >,
743                EqualityDeleteFileWriterBuilder<
744                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
745                >,
746            >,
747        >,
748        arrow_schema_with_op_column: SchemaRef,
749    },
750}
751
752impl IcebergWriterDispatch {
753    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
754        match self {
755            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
756            | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
757            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
758            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
759        }
760    }
761}
762
763pub struct IcebergWriterMetrics {
764    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
765    // They are actually used in `PrometheusWriterBuilder`:
766    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
767    // We keep them here to let the guard cleans the labels from metrics registry when dropped
768    _write_qps: LabelGuardedIntCounter,
769    _write_latency: LabelGuardedHistogram,
770    write_bytes: LabelGuardedIntCounter,
771}
772
773impl IcebergSinkWriter {
774    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
775        let SinkWriterParam {
776            extra_partition_col_idx,
777            actor_id,
778            sink_id,
779            sink_name,
780            ..
781        } = writer_param;
782        let metrics_labels = [
783            &actor_id.to_string(),
784            &sink_id.to_string(),
785            sink_name.as_str(),
786        ];
787
788        // Metrics
789        let write_qps = GLOBAL_SINK_METRICS
790            .iceberg_write_qps
791            .with_guarded_label_values(&metrics_labels);
792        let write_latency = GLOBAL_SINK_METRICS
793            .iceberg_write_latency
794            .with_guarded_label_values(&metrics_labels);
795        // # TODO
796        // Unused. Add this metrics later.
797        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
798            .iceberg_rolling_unflushed_data_file
799            .with_guarded_label_values(&metrics_labels);
800        let write_bytes = GLOBAL_SINK_METRICS
801            .iceberg_write_bytes
802            .with_guarded_label_values(&metrics_labels);
803
804        let schema = table.metadata().current_schema();
805        let partition_spec = table.metadata().default_partition_spec();
806
807        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
808        let unique_uuid_suffix = Uuid::now_v7();
809
810        let parquet_writer_properties = WriterProperties::builder()
811            .set_max_row_group_size(
812                writer_param
813                    .streaming_config
814                    .developer
815                    .iceberg_sink_write_parquet_max_row_group_rows,
816            )
817            .build();
818
819        let parquet_writer_builder = ParquetWriterBuilder::new(
820            parquet_writer_properties,
821            schema.clone(),
822            table.file_io().clone(),
823            DefaultLocationGenerator::new(table.metadata().clone())
824                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
825            DefaultFileNameGenerator::new(
826                writer_param.actor_id.to_string(),
827                Some(unique_uuid_suffix.to_string()),
828                iceberg::spec::DataFileFormat::Parquet,
829            ),
830        );
831        let data_file_builder =
832            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
833        if partition_spec.fields().is_empty() {
834            let writer_builder = MonitoredGeneralWriterBuilder::new(
835                data_file_builder,
836                write_qps.clone(),
837                write_latency.clone(),
838            );
839            let inner_writer = Some(Box::new(
840                writer_builder
841                    .clone()
842                    .build()
843                    .await
844                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
845            ) as Box<dyn IcebergWriter>);
846            Ok(Self {
847                arrow_schema: Arc::new(
848                    schema_to_arrow_schema(table.metadata().current_schema())
849                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
850                ),
851                metrics: IcebergWriterMetrics {
852                    _write_qps: write_qps,
853                    _write_latency: write_latency,
854                    write_bytes,
855                },
856                writer: IcebergWriterDispatch::NonPartitionAppendOnly {
857                    writer: inner_writer,
858                    writer_builder,
859                },
860                table,
861                project_idx_vec: {
862                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
863                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
864                    } else {
865                        ProjectIdxVec::None
866                    }
867                },
868            })
869        } else {
870            let partition_builder = MonitoredGeneralWriterBuilder::new(
871                FanoutPartitionWriterBuilder::new(
872                    data_file_builder,
873                    partition_spec.clone(),
874                    schema.clone(),
875                )
876                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
877                write_qps.clone(),
878                write_latency.clone(),
879            );
880            let inner_writer = Some(Box::new(
881                partition_builder
882                    .clone()
883                    .build()
884                    .await
885                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
886            ) as Box<dyn IcebergWriter>);
887            Ok(Self {
888                arrow_schema: Arc::new(
889                    schema_to_arrow_schema(table.metadata().current_schema())
890                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
891                ),
892                metrics: IcebergWriterMetrics {
893                    _write_qps: write_qps,
894                    _write_latency: write_latency,
895                    write_bytes,
896                },
897                writer: IcebergWriterDispatch::PartitionAppendOnly {
898                    writer: inner_writer,
899                    writer_builder: partition_builder,
900                },
901                table,
902                project_idx_vec: {
903                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
904                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
905                    } else {
906                        ProjectIdxVec::None
907                    }
908                },
909            })
910        }
911    }
912
913    pub async fn new_upsert(
914        table: Table,
915        unique_column_ids: Vec<usize>,
916        writer_param: &SinkWriterParam,
917    ) -> Result<Self> {
918        let SinkWriterParam {
919            extra_partition_col_idx,
920            actor_id,
921            sink_id,
922            sink_name,
923            ..
924        } = writer_param;
925        let metrics_labels = [
926            &actor_id.to_string(),
927            &sink_id.to_string(),
928            sink_name.as_str(),
929        ];
930        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
931
932        // Metrics
933        let write_qps = GLOBAL_SINK_METRICS
934            .iceberg_write_qps
935            .with_guarded_label_values(&metrics_labels);
936        let write_latency = GLOBAL_SINK_METRICS
937            .iceberg_write_latency
938            .with_guarded_label_values(&metrics_labels);
939        // # TODO
940        // Unused. Add this metrics later.
941        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
942            .iceberg_rolling_unflushed_data_file
943            .with_guarded_label_values(&metrics_labels);
944        let position_delete_cache_num = GLOBAL_SINK_METRICS
945            .iceberg_position_delete_cache_num
946            .with_guarded_label_values(&metrics_labels);
947        let write_bytes = GLOBAL_SINK_METRICS
948            .iceberg_write_bytes
949            .with_guarded_label_values(&metrics_labels);
950
951        // Determine the schema id and partition spec id
952        let schema = table.metadata().current_schema();
953        let partition_spec = table.metadata().default_partition_spec();
954
955        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
956        let unique_uuid_suffix = Uuid::now_v7();
957
958        let parquet_writer_properties = WriterProperties::builder()
959            .set_max_row_group_size(
960                writer_param
961                    .streaming_config
962                    .developer
963                    .iceberg_sink_write_parquet_max_row_group_rows,
964            )
965            .build();
966
967        let data_file_builder = {
968            let parquet_writer_builder = ParquetWriterBuilder::new(
969                parquet_writer_properties.clone(),
970                schema.clone(),
971                table.file_io().clone(),
972                DefaultLocationGenerator::new(table.metadata().clone())
973                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
974                DefaultFileNameGenerator::new(
975                    writer_param.actor_id.to_string(),
976                    Some(unique_uuid_suffix.to_string()),
977                    iceberg::spec::DataFileFormat::Parquet,
978                ),
979            );
980            DataFileWriterBuilder::new(
981                parquet_writer_builder.clone(),
982                None,
983                partition_spec.spec_id(),
984            )
985        };
986        let position_delete_builder = {
987            let parquet_writer_builder = ParquetWriterBuilder::new(
988                parquet_writer_properties.clone(),
989                POSITION_DELETE_SCHEMA.clone(),
990                table.file_io().clone(),
991                DefaultLocationGenerator::new(table.metadata().clone())
992                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
993                DefaultFileNameGenerator::new(
994                    writer_param.actor_id.to_string(),
995                    Some(format!("pos-del-{}", unique_uuid_suffix)),
996                    iceberg::spec::DataFileFormat::Parquet,
997                ),
998            );
999            MonitoredPositionDeleteWriterBuilder::new(
1000                SortPositionDeleteWriterBuilder::new(
1001                    parquet_writer_builder.clone(),
1002                    writer_param
1003                        .streaming_config
1004                        .developer
1005                        .iceberg_sink_positional_delete_cache_size,
1006                    None,
1007                    None,
1008                ),
1009                position_delete_cache_num,
1010            )
1011        };
1012        let equality_delete_builder = {
1013            let config = EqualityDeleteWriterConfig::new(
1014                unique_column_ids.clone(),
1015                table.metadata().current_schema().clone(),
1016                None,
1017                partition_spec.spec_id(),
1018            )
1019            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1020            let parquet_writer_builder = ParquetWriterBuilder::new(
1021                parquet_writer_properties.clone(),
1022                Arc::new(
1023                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1024                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1025                ),
1026                table.file_io().clone(),
1027                DefaultLocationGenerator::new(table.metadata().clone())
1028                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1029                DefaultFileNameGenerator::new(
1030                    writer_param.actor_id.to_string(),
1031                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1032                    iceberg::spec::DataFileFormat::Parquet,
1033                ),
1034            );
1035
1036            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
1037        };
1038        let delta_builder = EqualityDeltaWriterBuilder::new(
1039            data_file_builder,
1040            position_delete_builder,
1041            equality_delete_builder,
1042            unique_column_ids,
1043        );
1044        if partition_spec.fields().is_empty() {
1045            let writer_builder = MonitoredGeneralWriterBuilder::new(
1046                delta_builder,
1047                write_qps.clone(),
1048                write_latency.clone(),
1049            );
1050            let inner_writer = Some(Box::new(
1051                writer_builder
1052                    .clone()
1053                    .build()
1054                    .await
1055                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1056            ) as Box<dyn IcebergWriter>);
1057            let original_arrow_schema = Arc::new(
1058                schema_to_arrow_schema(table.metadata().current_schema())
1059                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1060            );
1061            let schema_with_extra_op_column = {
1062                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1063                new_fields.push(Arc::new(ArrowField::new(
1064                    "op".to_owned(),
1065                    ArrowDataType::Int32,
1066                    false,
1067                )));
1068                Arc::new(ArrowSchema::new(new_fields))
1069            };
1070            Ok(Self {
1071                arrow_schema: original_arrow_schema,
1072                metrics: IcebergWriterMetrics {
1073                    _write_qps: write_qps,
1074                    _write_latency: write_latency,
1075                    write_bytes,
1076                },
1077                table,
1078                writer: IcebergWriterDispatch::NonpartitionUpsert {
1079                    writer: inner_writer,
1080                    writer_builder,
1081                    arrow_schema_with_op_column: schema_with_extra_op_column,
1082                },
1083                project_idx_vec: {
1084                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1085                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1086                    } else {
1087                        ProjectIdxVec::None
1088                    }
1089                },
1090            })
1091        } else {
1092            let original_arrow_schema = Arc::new(
1093                schema_to_arrow_schema(table.metadata().current_schema())
1094                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1095            );
1096            let schema_with_extra_op_column = {
1097                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1098                new_fields.push(Arc::new(ArrowField::new(
1099                    "op".to_owned(),
1100                    ArrowDataType::Int32,
1101                    false,
1102                )));
1103                Arc::new(ArrowSchema::new(new_fields))
1104            };
1105            let partition_builder = MonitoredGeneralWriterBuilder::new(
1106                FanoutPartitionWriterBuilder::new_with_custom_schema(
1107                    delta_builder,
1108                    schema_with_extra_op_column.clone(),
1109                    partition_spec.clone(),
1110                    table.metadata().current_schema().clone(),
1111                ),
1112                write_qps.clone(),
1113                write_latency.clone(),
1114            );
1115            let inner_writer = Some(Box::new(
1116                partition_builder
1117                    .clone()
1118                    .build()
1119                    .await
1120                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1121            ) as Box<dyn IcebergWriter>);
1122            Ok(Self {
1123                arrow_schema: original_arrow_schema,
1124                metrics: IcebergWriterMetrics {
1125                    _write_qps: write_qps,
1126                    _write_latency: write_latency,
1127                    write_bytes,
1128                },
1129                table,
1130                writer: IcebergWriterDispatch::PartitionUpsert {
1131                    writer: inner_writer,
1132                    writer_builder: partition_builder,
1133                    arrow_schema_with_op_column: schema_with_extra_op_column,
1134                },
1135                project_idx_vec: {
1136                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1137                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1138                    } else {
1139                        ProjectIdxVec::None
1140                    }
1141                },
1142            })
1143        }
1144    }
1145}
1146
1147#[async_trait]
1148impl SinkWriter for IcebergSinkWriter {
1149    type CommitMetadata = Option<SinkMetadata>;
1150
1151    /// Begin a new epoch
1152    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1153        // Just skip it.
1154        Ok(())
1155    }
1156
1157    /// Write a stream chunk to sink
1158    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1159        // Try to build writer if it's None.
1160        match &mut self.writer {
1161            IcebergWriterDispatch::PartitionAppendOnly {
1162                writer,
1163                writer_builder,
1164            } => {
1165                if writer.is_none() {
1166                    *writer = Some(Box::new(
1167                        writer_builder
1168                            .clone()
1169                            .build()
1170                            .await
1171                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1172                    ));
1173                }
1174            }
1175            IcebergWriterDispatch::NonPartitionAppendOnly {
1176                writer,
1177                writer_builder,
1178            } => {
1179                if writer.is_none() {
1180                    *writer = Some(Box::new(
1181                        writer_builder
1182                            .clone()
1183                            .build()
1184                            .await
1185                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1186                    ));
1187                }
1188            }
1189            IcebergWriterDispatch::PartitionUpsert {
1190                writer,
1191                writer_builder,
1192                ..
1193            } => {
1194                if writer.is_none() {
1195                    *writer = Some(Box::new(
1196                        writer_builder
1197                            .clone()
1198                            .build()
1199                            .await
1200                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1201                    ));
1202                }
1203            }
1204            IcebergWriterDispatch::NonpartitionUpsert {
1205                writer,
1206                writer_builder,
1207                ..
1208            } => {
1209                if writer.is_none() {
1210                    *writer = Some(Box::new(
1211                        writer_builder
1212                            .clone()
1213                            .build()
1214                            .await
1215                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1216                    ));
1217                }
1218            }
1219        };
1220
1221        // Process the chunk.
1222        let (mut chunk, ops) = chunk.compact().into_parts();
1223        match &self.project_idx_vec {
1224            ProjectIdxVec::None => {}
1225            ProjectIdxVec::Prepare(idx) => {
1226                if *idx >= chunk.columns().len() {
1227                    return Err(SinkError::Iceberg(anyhow!(
1228                        "invalid extra partition column index {}",
1229                        idx
1230                    )));
1231                }
1232                let project_idx_vec = (0..*idx)
1233                    .chain(*idx + 1..chunk.columns().len())
1234                    .collect_vec();
1235                chunk = chunk.project(&project_idx_vec);
1236                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1237            }
1238            ProjectIdxVec::Done(idx_vec) => {
1239                chunk = chunk.project(idx_vec);
1240            }
1241        }
1242        if ops.is_empty() {
1243            return Ok(());
1244        }
1245        let write_batch_size = chunk.estimated_heap_size();
1246        let batch = match &self.writer {
1247            IcebergWriterDispatch::PartitionAppendOnly { .. }
1248            | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1249                // separate out insert chunk
1250                let filters =
1251                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1252                chunk.set_visibility(filters);
1253                IcebergArrowConvert
1254                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1255                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1256            }
1257            IcebergWriterDispatch::PartitionUpsert {
1258                arrow_schema_with_op_column,
1259                ..
1260            }
1261            | IcebergWriterDispatch::NonpartitionUpsert {
1262                arrow_schema_with_op_column,
1263                ..
1264            } => {
1265                let chunk = IcebergArrowConvert
1266                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1267                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1268                let ops = Arc::new(Int32Array::from(
1269                    ops.iter()
1270                        .map(|op| match op {
1271                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1272                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1273                        })
1274                        .collect_vec(),
1275                ));
1276                let mut columns = chunk.columns().to_vec();
1277                columns.push(ops);
1278                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1279                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1280            }
1281        };
1282
1283        let writer = self.writer.get_writer().unwrap();
1284        writer
1285            .write(batch)
1286            .instrument_await("iceberg_write")
1287            .await
1288            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1289        self.metrics.write_bytes.inc_by(write_batch_size as _);
1290        Ok(())
1291    }
1292
1293    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1294    /// writer should commit the current epoch.
1295    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1296        // Skip it if not checkpoint
1297        if !is_checkpoint {
1298            return Ok(None);
1299        }
1300
1301        let close_result = match &mut self.writer {
1302            IcebergWriterDispatch::PartitionAppendOnly {
1303                writer,
1304                writer_builder,
1305            } => {
1306                let close_result = match writer.take() {
1307                    Some(mut writer) => {
1308                        Some(writer.close().instrument_await("iceberg_close").await)
1309                    }
1310                    _ => None,
1311                };
1312                match writer_builder.clone().build().await {
1313                    Ok(new_writer) => {
1314                        *writer = Some(Box::new(new_writer));
1315                    }
1316                    _ => {
1317                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1318                        // here because current writer may close successfully. So we just log the error.
1319                        warn!("Failed to build new writer after close");
1320                    }
1321                }
1322                close_result
1323            }
1324            IcebergWriterDispatch::NonPartitionAppendOnly {
1325                writer,
1326                writer_builder,
1327            } => {
1328                let close_result = match writer.take() {
1329                    Some(mut writer) => {
1330                        Some(writer.close().instrument_await("iceberg_close").await)
1331                    }
1332                    _ => None,
1333                };
1334                match writer_builder.clone().build().await {
1335                    Ok(new_writer) => {
1336                        *writer = Some(Box::new(new_writer));
1337                    }
1338                    _ => {
1339                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1340                        // here because current writer may close successfully. So we just log the error.
1341                        warn!("Failed to build new writer after close");
1342                    }
1343                }
1344                close_result
1345            }
1346            IcebergWriterDispatch::PartitionUpsert {
1347                writer,
1348                writer_builder,
1349                ..
1350            } => {
1351                let close_result = match writer.take() {
1352                    Some(mut writer) => {
1353                        Some(writer.close().instrument_await("iceberg_close").await)
1354                    }
1355                    _ => None,
1356                };
1357                match writer_builder.clone().build().await {
1358                    Ok(new_writer) => {
1359                        *writer = Some(Box::new(new_writer));
1360                    }
1361                    _ => {
1362                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1363                        // here because current writer may close successfully. So we just log the error.
1364                        warn!("Failed to build new writer after close");
1365                    }
1366                }
1367                close_result
1368            }
1369            IcebergWriterDispatch::NonpartitionUpsert {
1370                writer,
1371                writer_builder,
1372                ..
1373            } => {
1374                let close_result = match writer.take() {
1375                    Some(mut writer) => {
1376                        Some(writer.close().instrument_await("iceberg_close").await)
1377                    }
1378                    _ => None,
1379                };
1380                match writer_builder.clone().build().await {
1381                    Ok(new_writer) => {
1382                        *writer = Some(Box::new(new_writer));
1383                    }
1384                    _ => {
1385                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1386                        // here because current writer may close successfully. So we just log the error.
1387                        warn!("Failed to build new writer after close");
1388                    }
1389                }
1390                close_result
1391            }
1392        };
1393
1394        match close_result {
1395            Some(Ok(result)) => {
1396                let version = self.table.metadata().format_version() as u8;
1397                let partition_type = self.table.metadata().default_partition_type();
1398                let data_files = result
1399                    .into_iter()
1400                    .map(|f| {
1401                        SerializedDataFile::try_from(f, partition_type, version == 1)
1402                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1403                    })
1404                    .collect::<Result<Vec<_>>>()?;
1405                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1406                    data_files,
1407                    schema_id: self.table.metadata().current_schema_id(),
1408                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1409                })?))
1410            }
1411            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1412            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1413        }
1414    }
1415
1416    /// Clean up
1417    async fn abort(&mut self) -> Result<()> {
1418        // TODO: abort should clean up all the data written in this epoch.
1419        Ok(())
1420    }
1421}
1422
1423const SCHEMA_ID: &str = "schema_id";
1424const PARTITION_SPEC_ID: &str = "partition_spec_id";
1425const DATA_FILES: &str = "data_files";
1426
1427#[derive(Default, Clone)]
1428struct IcebergCommitResult {
1429    schema_id: i32,
1430    partition_spec_id: i32,
1431    data_files: Vec<SerializedDataFile>,
1432}
1433
1434impl IcebergCommitResult {
1435    fn try_from(value: &SinkMetadata) -> Result<Self> {
1436        if let Some(Serialized(v)) = &value.metadata {
1437            let mut values = if let serde_json::Value::Object(v) =
1438                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1439                    .context("Can't parse iceberg sink metadata")?
1440            {
1441                v
1442            } else {
1443                bail!("iceberg sink metadata should be an object");
1444            };
1445
1446            let schema_id;
1447            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1448                schema_id = value
1449                    .as_u64()
1450                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1451            } else {
1452                bail!("iceberg sink metadata should have schema_id");
1453            }
1454
1455            let partition_spec_id;
1456            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1457                partition_spec_id = value
1458                    .as_u64()
1459                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1460            } else {
1461                bail!("iceberg sink metadata should have partition_spec_id");
1462            }
1463
1464            let data_files: Vec<SerializedDataFile>;
1465            if let serde_json::Value::Array(values) = values
1466                .remove(DATA_FILES)
1467                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1468            {
1469                data_files = values
1470                    .into_iter()
1471                    .map(from_value::<SerializedDataFile>)
1472                    .collect::<std::result::Result<_, _>>()
1473                    .unwrap();
1474            } else {
1475                bail!("iceberg sink metadata should have data_files object");
1476            }
1477
1478            Ok(Self {
1479                schema_id: schema_id as i32,
1480                partition_spec_id: partition_spec_id as i32,
1481                data_files,
1482            })
1483        } else {
1484            bail!("Can't create iceberg sink write result from empty data!")
1485        }
1486    }
1487
1488    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1489        let mut values = if let serde_json::Value::Object(value) =
1490            serde_json::from_slice::<serde_json::Value>(&value)
1491                .context("Can't parse iceberg sink metadata")?
1492        {
1493            value
1494        } else {
1495            bail!("iceberg sink metadata should be an object");
1496        };
1497
1498        let schema_id;
1499        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1500            schema_id = value
1501                .as_u64()
1502                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1503        } else {
1504            bail!("iceberg sink metadata should have schema_id");
1505        }
1506
1507        let partition_spec_id;
1508        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1509            partition_spec_id = value
1510                .as_u64()
1511                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1512        } else {
1513            bail!("iceberg sink metadata should have partition_spec_id");
1514        }
1515
1516        let data_files: Vec<SerializedDataFile>;
1517        if let serde_json::Value::Array(values) = values
1518            .remove(DATA_FILES)
1519            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1520        {
1521            data_files = values
1522                .into_iter()
1523                .map(from_value::<SerializedDataFile>)
1524                .collect::<std::result::Result<_, _>>()
1525                .unwrap();
1526        } else {
1527            bail!("iceberg sink metadata should have data_files object");
1528        }
1529
1530        Ok(Self {
1531            schema_id: schema_id as i32,
1532            partition_spec_id: partition_spec_id as i32,
1533            data_files,
1534        })
1535    }
1536}
1537
1538impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1539    type Error = SinkError;
1540
1541    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1542        let json_data_files = serde_json::Value::Array(
1543            value
1544                .data_files
1545                .iter()
1546                .map(serde_json::to_value)
1547                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1548                .context("Can't serialize data files to json")?,
1549        );
1550        let json_value = serde_json::Value::Object(
1551            vec![
1552                (
1553                    SCHEMA_ID.to_owned(),
1554                    serde_json::Value::Number(value.schema_id.into()),
1555                ),
1556                (
1557                    PARTITION_SPEC_ID.to_owned(),
1558                    serde_json::Value::Number(value.partition_spec_id.into()),
1559                ),
1560                (DATA_FILES.to_owned(), json_data_files),
1561            ]
1562            .into_iter()
1563            .collect(),
1564        );
1565        Ok(SinkMetadata {
1566            metadata: Some(Serialized(SerializedMetadata {
1567                metadata: serde_json::to_vec(&json_value)
1568                    .context("Can't serialize iceberg sink metadata")?,
1569            })),
1570        })
1571    }
1572}
1573
1574impl TryFrom<IcebergCommitResult> for Vec<u8> {
1575    type Error = SinkError;
1576
1577    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1578        let json_data_files = serde_json::Value::Array(
1579            value
1580                .data_files
1581                .iter()
1582                .map(serde_json::to_value)
1583                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1584                .context("Can't serialize data files to json")?,
1585        );
1586        let json_value = serde_json::Value::Object(
1587            vec![
1588                (
1589                    SCHEMA_ID.to_owned(),
1590                    serde_json::Value::Number(value.schema_id.into()),
1591                ),
1592                (
1593                    PARTITION_SPEC_ID.to_owned(),
1594                    serde_json::Value::Number(value.partition_spec_id.into()),
1595                ),
1596                (DATA_FILES.to_owned(), json_data_files),
1597            ]
1598            .into_iter()
1599            .collect(),
1600        );
1601        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1602    }
1603}
1604pub struct IcebergSinkCommitter {
1605    catalog: Arc<dyn Catalog>,
1606    table: Table,
1607    pub last_commit_epoch: u64,
1608    pub(crate) is_exactly_once: bool,
1609    pub(crate) sink_id: u32,
1610    pub(crate) config: IcebergConfig,
1611    pub(crate) param: SinkParam,
1612    pub(crate) db: DatabaseConnection,
1613    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1614    commit_retry_num: u32,
1615    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1616}
1617
1618impl IcebergSinkCommitter {
1619    // Reload table and guarantee current schema_id and partition_spec_id matches
1620    // given `schema_id` and `partition_spec_id`
1621    async fn reload_table(
1622        catalog: &dyn Catalog,
1623        table_ident: &TableIdent,
1624        schema_id: i32,
1625        partition_spec_id: i32,
1626    ) -> Result<Table> {
1627        let table = catalog
1628            .load_table(table_ident)
1629            .await
1630            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1631        if table.metadata().current_schema_id() != schema_id {
1632            return Err(SinkError::Iceberg(anyhow!(
1633                "Schema evolution not supported, expect schema id {}, but got {}",
1634                schema_id,
1635                table.metadata().current_schema_id()
1636            )));
1637        }
1638        if table.metadata().default_partition_spec_id() != partition_spec_id {
1639            return Err(SinkError::Iceberg(anyhow!(
1640                "Partition evolution not supported, expect partition spec id {}, but got {}",
1641                partition_spec_id,
1642                table.metadata().default_partition_spec_id()
1643            )));
1644        }
1645        Ok(table)
1646    }
1647}
1648
1649#[async_trait::async_trait]
1650impl SinkCommitCoordinator for IcebergSinkCommitter {
1651    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1652        if self.is_exactly_once {
1653            self.committed_epoch_subscriber = Some(subscriber);
1654            tracing::info!(
1655                "Sink id = {}: iceberg sink coordinator initing.",
1656                self.param.sink_id.sink_id()
1657            );
1658            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1659                let ordered_metadata_list_by_end_epoch =
1660                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1661
1662                let mut last_recommit_epoch = 0;
1663                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1664                    ordered_metadata_list_by_end_epoch
1665                {
1666                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1667                    let mut write_results = vec![];
1668
1669                    for each in write_results_bytes {
1670                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1671                        write_results.push(write_result);
1672                    }
1673
1674                    match (
1675                        committed,
1676                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1677                            .await?,
1678                    ) {
1679                        (true, _) => {
1680                            tracing::info!(
1681                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1682                                self.param.sink_id.sink_id()
1683                            );
1684                        }
1685                        (false, true) => {
1686                            // skip
1687                            tracing::info!(
1688                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1689                                self.param.sink_id.sink_id()
1690                            );
1691                            mark_row_is_committed_by_sink_id_and_end_epoch(
1692                                &self.db,
1693                                self.sink_id,
1694                                end_epoch,
1695                            )
1696                            .await?;
1697                        }
1698                        (false, false) => {
1699                            tracing::info!(
1700                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1701                                self.param.sink_id.sink_id()
1702                            );
1703                            self.re_commit(end_epoch, write_results, snapshot_id)
1704                                .await?;
1705                        }
1706                    }
1707
1708                    last_recommit_epoch = end_epoch;
1709                }
1710                tracing::info!(
1711                    "Sink id = {}: iceberg commit coordinator inited.",
1712                    self.param.sink_id.sink_id()
1713                );
1714                return Ok(Some(last_recommit_epoch));
1715            } else {
1716                tracing::info!(
1717                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1718                    self.param.sink_id.sink_id()
1719                );
1720                return Ok(None);
1721            }
1722        }
1723
1724        tracing::info!("Iceberg commit coordinator inited.");
1725        return Ok(None);
1726    }
1727
1728    async fn commit(
1729        &mut self,
1730        epoch: u64,
1731        metadata: Vec<SinkMetadata>,
1732        add_columns: Option<Vec<Field>>,
1733    ) -> Result<()> {
1734        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1735        if let Some(add_columns) = add_columns {
1736            return Err(anyhow!(
1737                "Iceberg sink not support add columns, but got: {:?}",
1738                add_columns
1739            )
1740            .into());
1741        }
1742
1743        let write_results: Vec<IcebergCommitResult> = metadata
1744            .iter()
1745            .map(IcebergCommitResult::try_from)
1746            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1747
1748        // Skip if no data to commit
1749        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1750            tracing::debug!(?epoch, "no data to commit");
1751            return Ok(());
1752        }
1753
1754        // guarantee that all write results has same schema_id and partition_spec_id
1755        if write_results
1756            .iter()
1757            .any(|r| r.schema_id != write_results[0].schema_id)
1758            || write_results
1759                .iter()
1760                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1761        {
1762            return Err(SinkError::Iceberg(anyhow!(
1763                "schema_id and partition_spec_id should be the same in all write results"
1764            )));
1765        }
1766
1767        // Check snapshot limit before proceeding with commit
1768        self.wait_for_snapshot_limit().await?;
1769
1770        if self.is_exactly_once {
1771            assert!(self.committed_epoch_subscriber.is_some());
1772            match self.committed_epoch_subscriber.clone() {
1773                Some(committed_epoch_subscriber) => {
1774                    // Get the latest committed_epoch and the receiver
1775                    let (committed_epoch, mut rw_futures_utilrx) =
1776                        committed_epoch_subscriber(self.param.sink_id).await?;
1777                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1778                    if committed_epoch >= epoch {
1779                        self.commit_iceberg_inner(epoch, write_results, None)
1780                            .await?;
1781                    } else {
1782                        tracing::info!(
1783                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1784                            committed_epoch,
1785                            epoch
1786                        );
1787                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1788                            tracing::info!(
1789                                "Received next committed epoch: {}",
1790                                next_committed_epoch
1791                            );
1792                            // If next_epoch meets the condition, execute commit immediately
1793                            if next_committed_epoch >= epoch {
1794                                self.commit_iceberg_inner(epoch, write_results, None)
1795                                    .await?;
1796                                break;
1797                            }
1798                        }
1799                    }
1800                }
1801                None => unreachable!(
1802                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1803                ),
1804            }
1805        } else {
1806            self.commit_iceberg_inner(epoch, write_results, None)
1807                .await?;
1808        }
1809
1810        Ok(())
1811    }
1812}
1813
1814/// Methods Required to Achieve Exactly Once Semantics
1815impl IcebergSinkCommitter {
1816    async fn re_commit(
1817        &mut self,
1818        epoch: u64,
1819        write_results: Vec<IcebergCommitResult>,
1820        snapshot_id: i64,
1821    ) -> Result<()> {
1822        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1823
1824        // Skip if no data to commit
1825        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1826            tracing::debug!(?epoch, "no data to commit");
1827            return Ok(());
1828        }
1829        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1830            .await?;
1831        Ok(())
1832    }
1833
1834    async fn commit_iceberg_inner(
1835        &mut self,
1836        epoch: u64,
1837        write_results: Vec<IcebergCommitResult>,
1838        snapshot_id: Option<i64>,
1839    ) -> Result<()> {
1840        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1841        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1842        // that was previously persisted in the system table to commit.
1843        let is_first_commit = snapshot_id.is_none();
1844        self.last_commit_epoch = epoch;
1845        let expect_schema_id = write_results[0].schema_id;
1846        let expect_partition_spec_id = write_results[0].partition_spec_id;
1847
1848        // Load the latest table to avoid concurrent modification with the best effort.
1849        self.table = Self::reload_table(
1850            self.catalog.as_ref(),
1851            self.table.identifier(),
1852            expect_schema_id,
1853            expect_partition_spec_id,
1854        )
1855        .await?;
1856        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1857            return Err(SinkError::Iceberg(anyhow!(
1858                "Can't find schema by id {}",
1859                expect_schema_id
1860            )));
1861        };
1862        let Some(partition_spec) = self
1863            .table
1864            .metadata()
1865            .partition_spec_by_id(expect_partition_spec_id)
1866        else {
1867            return Err(SinkError::Iceberg(anyhow!(
1868                "Can't find partition spec by id {}",
1869                expect_partition_spec_id
1870            )));
1871        };
1872        let partition_type = partition_spec
1873            .as_ref()
1874            .clone()
1875            .partition_type(schema)
1876            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1877
1878        let txn = Transaction::new(&self.table);
1879        // Only generate new snapshot id when first commit.
1880        let snapshot_id = match snapshot_id {
1881            Some(previous_snapshot_id) => previous_snapshot_id,
1882            None => txn.generate_unique_snapshot_id(),
1883        };
1884        if self.is_exactly_once && is_first_commit {
1885            // persist pre commit metadata and snapshot id in system table.
1886            let mut pre_commit_metadata_bytes = Vec::new();
1887            for each_parallelism_write_result in write_results.clone() {
1888                let each_parallelism_write_result_bytes: Vec<u8> =
1889                    each_parallelism_write_result.try_into()?;
1890                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1891            }
1892
1893            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1894
1895            persist_pre_commit_metadata(
1896                self.sink_id,
1897                self.db.clone(),
1898                self.last_commit_epoch,
1899                epoch,
1900                pre_commit_metadata_bytes,
1901                snapshot_id,
1902            )
1903            .await?;
1904        }
1905
1906        let data_files = write_results
1907            .into_iter()
1908            .flat_map(|r| {
1909                r.data_files.into_iter().map(|f| {
1910                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1911                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1912                })
1913            })
1914            .collect::<Result<Vec<DataFile>>>()?;
1915        // # TODO:
1916        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1917        // because retry logic involved reapply the commit metadata.
1918        // For now, we just retry the commit operation.
1919        let retry_strategy = ExponentialBackoff::from_millis(10)
1920            .max_delay(Duration::from_secs(60))
1921            .map(jitter)
1922            .take(self.commit_retry_num as usize);
1923        let catalog = self.catalog.clone();
1924        let table_ident = self.table.identifier().clone();
1925        let table = Retry::spawn(retry_strategy, || async {
1926            let table = Self::reload_table(
1927                catalog.as_ref(),
1928                &table_ident,
1929                expect_schema_id,
1930                expect_partition_spec_id,
1931            )
1932            .await?;
1933            let txn = Transaction::new(&table);
1934            let mut append_action = txn
1935                .fast_append(Some(snapshot_id), None, vec![])
1936                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1937                .with_to_branch(commit_branch(
1938                    self.config.r#type.as_str(),
1939                    self.config.write_mode.as_str(),
1940                ));
1941            append_action
1942                .add_data_files(data_files.clone())
1943                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1944            let tx = append_action.apply().await.map_err(|err| {
1945                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1946                SinkError::Iceberg(anyhow!(err))
1947            })?;
1948            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1949                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1950                SinkError::Iceberg(anyhow!(err))
1951            })
1952        })
1953        .await?;
1954        self.table = table;
1955
1956        let snapshot_num = self.table.metadata().snapshots().count();
1957        let catalog_name = self.config.common.catalog_name();
1958        let table_name = self.table.identifier().to_string();
1959        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1960        GLOBAL_SINK_METRICS
1961            .iceberg_snapshot_num
1962            .with_guarded_label_values(&metrics_labels)
1963            .set(snapshot_num as i64);
1964
1965        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1966
1967        if self.is_exactly_once {
1968            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1969            tracing::info!(
1970                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1971                self.sink_id,
1972                epoch
1973            );
1974
1975            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1976        }
1977
1978        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1979            && self.config.enable_compaction
1980            && iceberg_compact_stat_sender
1981                .send(IcebergSinkCompactionUpdate {
1982                    sink_id: SinkId::new(self.sink_id),
1983                    compaction_interval: self.config.compaction_interval_sec(),
1984                    force_compaction: false,
1985                })
1986                .is_err()
1987        {
1988            warn!("failed to send iceberg compaction stats");
1989        }
1990
1991        Ok(())
1992    }
1993
1994    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1995    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1996    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1997    async fn is_snapshot_id_in_iceberg(
1998        &self,
1999        iceberg_config: &IcebergConfig,
2000        snapshot_id: i64,
2001    ) -> Result<bool> {
2002        let iceberg_common = iceberg_config.common.clone();
2003        let table = iceberg_common
2004            .load_table(&iceberg_config.java_catalog_props)
2005            .await?;
2006        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2007            Ok(true)
2008        } else {
2009            Ok(false)
2010        }
2011    }
2012
2013    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2014    /// Returns the number of snapshots since the last rewrite/overwrite
2015    fn count_snapshots_since_rewrite(&self) -> usize {
2016        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2017        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2018
2019        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2020        let mut count = 0;
2021        for snapshot in snapshots {
2022            // Check if this snapshot represents a rewrite or overwrite operation
2023            let summary = snapshot.summary();
2024            match &summary.operation {
2025                Operation::Replace => {
2026                    // Found a rewrite/overwrite operation, stop counting
2027                    break;
2028                }
2029
2030                _ => {
2031                    // Increment count for each snapshot that is not a rewrite/overwrite
2032                    count += 1;
2033                }
2034            }
2035        }
2036
2037        count
2038    }
2039
2040    /// Wait until snapshot count since last rewrite is below the limit
2041    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2042        if !self.config.enable_compaction {
2043            return Ok(());
2044        }
2045
2046        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2047            loop {
2048                let current_count = self.count_snapshots_since_rewrite();
2049
2050                if current_count < max_snapshots {
2051                    tracing::info!(
2052                        "Snapshot count check passed: {} < {}",
2053                        current_count,
2054                        max_snapshots
2055                    );
2056                    break;
2057                }
2058
2059                tracing::info!(
2060                    "Snapshot count {} exceeds limit {}, waiting...",
2061                    current_count,
2062                    max_snapshots
2063                );
2064
2065                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2066                    && iceberg_compact_stat_sender
2067                        .send(IcebergSinkCompactionUpdate {
2068                            sink_id: SinkId::new(self.sink_id),
2069                            compaction_interval: self.config.compaction_interval_sec(),
2070                            force_compaction: true,
2071                        })
2072                        .is_err()
2073                {
2074                    tracing::warn!("failed to send iceberg compaction stats");
2075                }
2076
2077                // Wait for 30 seconds before checking again
2078                tokio::time::sleep(Duration::from_secs(30)).await;
2079
2080                // Reload table to get latest snapshots
2081                self.table = self.config.load_table().await?;
2082            }
2083        }
2084        Ok(())
2085    }
2086}
2087
2088const MAP_KEY: &str = "key";
2089const MAP_VALUE: &str = "value";
2090
2091fn get_fields<'a>(
2092    our_field_type: &'a risingwave_common::types::DataType,
2093    data_type: &ArrowDataType,
2094    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2095) -> Option<ArrowFields> {
2096    match data_type {
2097        ArrowDataType::Struct(fields) => {
2098            match our_field_type {
2099                risingwave_common::types::DataType::Struct(struct_fields) => {
2100                    struct_fields.iter().for_each(|(name, data_type)| {
2101                        let res = schema_fields.insert(name, data_type);
2102                        // This assert is to make sure there is no duplicate field name in the schema.
2103                        assert!(res.is_none())
2104                    });
2105                }
2106                risingwave_common::types::DataType::Map(map_fields) => {
2107                    schema_fields.insert(MAP_KEY, map_fields.key());
2108                    schema_fields.insert(MAP_VALUE, map_fields.value());
2109                }
2110                risingwave_common::types::DataType::List(list) => {
2111                    list.elem()
2112                        .as_struct()
2113                        .iter()
2114                        .for_each(|(name, data_type)| {
2115                            let res = schema_fields.insert(name, data_type);
2116                            // This assert is to make sure there is no duplicate field name in the schema.
2117                            assert!(res.is_none())
2118                        });
2119                }
2120                _ => {}
2121            };
2122            Some(fields.clone())
2123        }
2124        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2125            get_fields(our_field_type, field.data_type(), schema_fields)
2126        }
2127        _ => None, // not a supported complex type and unlikely to show up
2128    }
2129}
2130
2131fn check_compatibility(
2132    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2133    fields: &ArrowFields,
2134) -> anyhow::Result<bool> {
2135    for arrow_field in fields {
2136        let our_field_type = schema_fields
2137            .get(arrow_field.name().as_str())
2138            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2139
2140        // Iceberg source should be able to read iceberg decimal type.
2141        let converted_arrow_data_type = IcebergArrowConvert
2142            .to_arrow_field("", our_field_type)
2143            .map_err(|e| anyhow!(e))?
2144            .data_type()
2145            .clone();
2146
2147        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2148            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2149            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2150            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2151            (ArrowDataType::List(_), ArrowDataType::List(field))
2152            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2153                let mut schema_fields = HashMap::new();
2154                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2155                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2156            }
2157            // validate nested structs
2158            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2159                let mut schema_fields = HashMap::new();
2160                our_field_type
2161                    .as_struct()
2162                    .iter()
2163                    .for_each(|(name, data_type)| {
2164                        let res = schema_fields.insert(name, data_type);
2165                        // This assert is to make sure there is no duplicate field name in the schema.
2166                        assert!(res.is_none())
2167                    });
2168                check_compatibility(schema_fields, fields)?
2169            }
2170            // cases where left != right (metadata, field name mismatch)
2171            //
2172            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2173            // {"PARQUET:field_id": ".."}
2174            //
2175            // map: The standard name in arrow is "entries", "key", "value".
2176            // in iceberg-rs, it's called "key_value"
2177            (left, right) => left.equals_datatype(right),
2178        };
2179        if !compatible {
2180            bail!(
2181                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2182                arrow_field.name(),
2183                converted_arrow_data_type,
2184                arrow_field.data_type()
2185            );
2186        }
2187    }
2188    Ok(true)
2189}
2190
2191/// Try to match our schema with iceberg schema.
2192pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2193    if rw_schema.fields.len() != arrow_schema.fields().len() {
2194        bail!(
2195            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2196            rw_schema.fields.len(),
2197            arrow_schema.fields.len()
2198        );
2199    }
2200
2201    let mut schema_fields = HashMap::new();
2202    rw_schema.fields.iter().for_each(|field| {
2203        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2204        // This assert is to make sure there is no duplicate field name in the schema.
2205        assert!(res.is_none())
2206    });
2207
2208    check_compatibility(schema_fields, &arrow_schema.fields)?;
2209    Ok(())
2210}
2211
2212pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2213    serde_json::to_vec(&metadata).unwrap()
2214}
2215
2216pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2217    serde_json::from_slice(&bytes).unwrap()
2218}
2219
2220pub fn parse_partition_by_exprs(
2221    expr: String,
2222) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2223    // captures column, transform(column), transform(n,column), transform(n, column)
2224    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2225    if !re.is_match(&expr) {
2226        bail!(format!(
2227            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2228            expr
2229        ))
2230    }
2231    let caps = re.captures_iter(&expr);
2232
2233    let mut partition_columns = vec![];
2234
2235    for mat in caps {
2236        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2237            (&mat["transform"], Transform::Identity)
2238        } else {
2239            let mut func = mat["transform"].to_owned();
2240            if func == "bucket" || func == "truncate" {
2241                let n = &mat
2242                    .name("n")
2243                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2244                    .as_str();
2245                func = format!("{func}[{n}]");
2246            }
2247            (
2248                &mat["field"],
2249                Transform::from_str(&func)
2250                    .with_context(|| format!("invalid transform function {}", func))?,
2251            )
2252        };
2253        partition_columns.push((column.to_owned(), transform));
2254    }
2255    Ok(partition_columns)
2256}
2257
2258pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2259    if should_enable_iceberg_cow(sink_type, write_mode) {
2260        ICEBERG_COW_BRANCH.to_owned()
2261    } else {
2262        MAIN_BRANCH.to_owned()
2263    }
2264}
2265
2266pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2267    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2268}
2269
2270#[cfg(test)]
2271mod test {
2272    use std::collections::BTreeMap;
2273
2274    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2275    use risingwave_common::types::{DataType, MapType, StructType};
2276
2277    use crate::connector_common::IcebergCommon;
2278    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2279    use crate::sink::iceberg::{
2280        COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2281        ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2282        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2283        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2284    };
2285
2286    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2287
2288    #[test]
2289    fn test_compatible_arrow_schema() {
2290        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2291
2292        use super::*;
2293        let risingwave_schema = Schema::new(vec![
2294            Field::with_name(DataType::Int32, "a"),
2295            Field::with_name(DataType::Int32, "b"),
2296            Field::with_name(DataType::Int32, "c"),
2297        ]);
2298        let arrow_schema = ArrowSchema::new(vec![
2299            ArrowField::new("a", ArrowDataType::Int32, false),
2300            ArrowField::new("b", ArrowDataType::Int32, false),
2301            ArrowField::new("c", ArrowDataType::Int32, false),
2302        ]);
2303
2304        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2305
2306        let risingwave_schema = Schema::new(vec![
2307            Field::with_name(DataType::Int32, "d"),
2308            Field::with_name(DataType::Int32, "c"),
2309            Field::with_name(DataType::Int32, "a"),
2310            Field::with_name(DataType::Int32, "b"),
2311        ]);
2312        let arrow_schema = ArrowSchema::new(vec![
2313            ArrowField::new("a", ArrowDataType::Int32, false),
2314            ArrowField::new("b", ArrowDataType::Int32, false),
2315            ArrowField::new("d", ArrowDataType::Int32, false),
2316            ArrowField::new("c", ArrowDataType::Int32, false),
2317        ]);
2318        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2319
2320        let risingwave_schema = Schema::new(vec![
2321            Field::with_name(
2322                DataType::Struct(StructType::new(vec![
2323                    ("a1", DataType::Int32),
2324                    (
2325                        "a2",
2326                        DataType::Struct(StructType::new(vec![
2327                            ("a21", DataType::Bytea),
2328                            (
2329                                "a22",
2330                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2331                            ),
2332                        ])),
2333                    ),
2334                ])),
2335                "a",
2336            ),
2337            Field::with_name(
2338                DataType::list(DataType::Struct(StructType::new(vec![
2339                    ("b1", DataType::Int32),
2340                    ("b2", DataType::Bytea),
2341                    (
2342                        "b3",
2343                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2344                    ),
2345                ]))),
2346                "b",
2347            ),
2348            Field::with_name(
2349                DataType::Map(MapType::from_kv(
2350                    DataType::Varchar,
2351                    DataType::list(DataType::Struct(StructType::new([
2352                        ("c1", DataType::Int32),
2353                        ("c2", DataType::Bytea),
2354                        (
2355                            "c3",
2356                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2357                        ),
2358                    ]))),
2359                )),
2360                "c",
2361            ),
2362        ]);
2363        let arrow_schema = ArrowSchema::new(vec![
2364            ArrowField::new(
2365                "a",
2366                ArrowDataType::Struct(ArrowFields::from(vec![
2367                    ArrowField::new("a1", ArrowDataType::Int32, false),
2368                    ArrowField::new(
2369                        "a2",
2370                        ArrowDataType::Struct(ArrowFields::from(vec![
2371                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2372                            ArrowField::new_map(
2373                                "a22",
2374                                "entries",
2375                                ArrowFieldRef::new(ArrowField::new(
2376                                    "key",
2377                                    ArrowDataType::Utf8,
2378                                    false,
2379                                )),
2380                                ArrowFieldRef::new(ArrowField::new(
2381                                    "value",
2382                                    ArrowDataType::Utf8,
2383                                    false,
2384                                )),
2385                                false,
2386                                false,
2387                            ),
2388                        ])),
2389                        false,
2390                    ),
2391                ])),
2392                false,
2393            ),
2394            ArrowField::new(
2395                "b",
2396                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2397                    ArrowDataType::Struct(ArrowFields::from(vec![
2398                        ArrowField::new("b1", ArrowDataType::Int32, false),
2399                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2400                        ArrowField::new_map(
2401                            "b3",
2402                            "entries",
2403                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2404                            ArrowFieldRef::new(ArrowField::new(
2405                                "value",
2406                                ArrowDataType::Utf8,
2407                                false,
2408                            )),
2409                            false,
2410                            false,
2411                        ),
2412                    ])),
2413                    false,
2414                ))),
2415                false,
2416            ),
2417            ArrowField::new_map(
2418                "c",
2419                "entries",
2420                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2421                ArrowFieldRef::new(ArrowField::new(
2422                    "value",
2423                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2424                        ArrowDataType::Struct(ArrowFields::from(vec![
2425                            ArrowField::new("c1", ArrowDataType::Int32, false),
2426                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2427                            ArrowField::new_map(
2428                                "c3",
2429                                "entries",
2430                                ArrowFieldRef::new(ArrowField::new(
2431                                    "key",
2432                                    ArrowDataType::Utf8,
2433                                    false,
2434                                )),
2435                                ArrowFieldRef::new(ArrowField::new(
2436                                    "value",
2437                                    ArrowDataType::Utf8,
2438                                    false,
2439                                )),
2440                                false,
2441                                false,
2442                            ),
2443                        ])),
2444                        false,
2445                    ))),
2446                    false,
2447                )),
2448                false,
2449                false,
2450            ),
2451        ]);
2452        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2453    }
2454
2455    #[test]
2456    fn test_parse_iceberg_config() {
2457        let values = [
2458            ("connector", "iceberg"),
2459            ("type", "upsert"),
2460            ("primary_key", "v1"),
2461            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2462            ("warehouse.path", "s3://iceberg"),
2463            ("s3.endpoint", "http://127.0.0.1:9301"),
2464            ("s3.access.key", "hummockadmin"),
2465            ("s3.secret.key", "hummockadmin"),
2466            ("s3.path.style.access", "true"),
2467            ("s3.region", "us-east-1"),
2468            ("catalog.type", "jdbc"),
2469            ("catalog.name", "demo"),
2470            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2471            ("catalog.jdbc.user", "admin"),
2472            ("catalog.jdbc.password", "123456"),
2473            ("database.name", "demo_db"),
2474            ("table.name", "demo_table"),
2475            ("enable_compaction", "true"),
2476            ("compaction_interval_sec", "1800"),
2477            ("enable_snapshot_expiration", "true"),
2478        ]
2479        .into_iter()
2480        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2481        .collect();
2482
2483        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2484
2485        let expected_iceberg_config = IcebergConfig {
2486            common: IcebergCommon {
2487                warehouse_path: Some("s3://iceberg".to_owned()),
2488                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2489                region: Some("us-east-1".to_owned()),
2490                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2491                access_key: Some("hummockadmin".to_owned()),
2492                secret_key: Some("hummockadmin".to_owned()),
2493                gcs_credential: None,
2494                catalog_type: Some("jdbc".to_owned()),
2495                glue_id: None,
2496                catalog_name: Some("demo".to_owned()),
2497                database_name: Some("demo_db".to_owned()),
2498                table_name: "demo_table".to_owned(),
2499                path_style_access: Some(true),
2500                credential: None,
2501                oauth2_server_uri: None,
2502                scope: None,
2503                token: None,
2504                enable_config_load: None,
2505                rest_signing_name: None,
2506                rest_signing_region: None,
2507                rest_sigv4_enabled: None,
2508                hosted_catalog: None,
2509                azblob_account_name: None,
2510                azblob_account_key: None,
2511                azblob_endpoint_url: None,
2512                header: None,
2513            },
2514            r#type: "upsert".to_owned(),
2515            force_append_only: false,
2516            primary_key: Some(vec!["v1".to_owned()]),
2517            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2518            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2519                .into_iter()
2520                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2521                .collect(),
2522            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2523            create_table_if_not_exists: false,
2524            is_exactly_once: None,
2525            commit_retry_num: 8,
2526            enable_compaction: true,
2527            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2528            enable_snapshot_expiration: true,
2529            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2530            snapshot_expiration_max_age_millis: None,
2531            snapshot_expiration_retain_last: None,
2532            snapshot_expiration_clear_expired_files: true,
2533            snapshot_expiration_clear_expired_meta_data: true,
2534            max_snapshots_num_before_compaction: None,
2535        };
2536
2537        assert_eq!(iceberg_config, expected_iceberg_config);
2538
2539        assert_eq!(
2540            &iceberg_config.common.full_table_name().unwrap().to_string(),
2541            "demo_db.demo_table"
2542        );
2543    }
2544
2545    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2546        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2547
2548        let table = iceberg_config.load_table().await.unwrap();
2549
2550        println!("{:?}", table.identifier());
2551    }
2552
2553    #[tokio::test]
2554    #[ignore]
2555    async fn test_storage_catalog() {
2556        let values = [
2557            ("connector", "iceberg"),
2558            ("type", "append-only"),
2559            ("force_append_only", "true"),
2560            ("s3.endpoint", "http://127.0.0.1:9301"),
2561            ("s3.access.key", "hummockadmin"),
2562            ("s3.secret.key", "hummockadmin"),
2563            ("s3.region", "us-east-1"),
2564            ("s3.path.style.access", "true"),
2565            ("catalog.name", "demo"),
2566            ("catalog.type", "storage"),
2567            ("warehouse.path", "s3://icebergdata/demo"),
2568            ("database.name", "s1"),
2569            ("table.name", "t1"),
2570        ]
2571        .into_iter()
2572        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2573        .collect();
2574
2575        test_create_catalog(values).await;
2576    }
2577
2578    #[tokio::test]
2579    #[ignore]
2580    async fn test_rest_catalog() {
2581        let values = [
2582            ("connector", "iceberg"),
2583            ("type", "append-only"),
2584            ("force_append_only", "true"),
2585            ("s3.endpoint", "http://127.0.0.1:9301"),
2586            ("s3.access.key", "hummockadmin"),
2587            ("s3.secret.key", "hummockadmin"),
2588            ("s3.region", "us-east-1"),
2589            ("s3.path.style.access", "true"),
2590            ("catalog.name", "demo"),
2591            ("catalog.type", "rest"),
2592            ("catalog.uri", "http://192.168.167.4:8181"),
2593            ("warehouse.path", "s3://icebergdata/demo"),
2594            ("database.name", "s1"),
2595            ("table.name", "t1"),
2596        ]
2597        .into_iter()
2598        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2599        .collect();
2600
2601        test_create_catalog(values).await;
2602    }
2603
2604    #[tokio::test]
2605    #[ignore]
2606    async fn test_jdbc_catalog() {
2607        let values = [
2608            ("connector", "iceberg"),
2609            ("type", "append-only"),
2610            ("force_append_only", "true"),
2611            ("s3.endpoint", "http://127.0.0.1:9301"),
2612            ("s3.access.key", "hummockadmin"),
2613            ("s3.secret.key", "hummockadmin"),
2614            ("s3.region", "us-east-1"),
2615            ("s3.path.style.access", "true"),
2616            ("catalog.name", "demo"),
2617            ("catalog.type", "jdbc"),
2618            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2619            ("catalog.jdbc.user", "admin"),
2620            ("catalog.jdbc.password", "123456"),
2621            ("warehouse.path", "s3://icebergdata/demo"),
2622            ("database.name", "s1"),
2623            ("table.name", "t1"),
2624        ]
2625        .into_iter()
2626        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2627        .collect();
2628
2629        test_create_catalog(values).await;
2630    }
2631
2632    #[tokio::test]
2633    #[ignore]
2634    async fn test_hive_catalog() {
2635        let values = [
2636            ("connector", "iceberg"),
2637            ("type", "append-only"),
2638            ("force_append_only", "true"),
2639            ("s3.endpoint", "http://127.0.0.1:9301"),
2640            ("s3.access.key", "hummockadmin"),
2641            ("s3.secret.key", "hummockadmin"),
2642            ("s3.region", "us-east-1"),
2643            ("s3.path.style.access", "true"),
2644            ("catalog.name", "demo"),
2645            ("catalog.type", "hive"),
2646            ("catalog.uri", "thrift://localhost:9083"),
2647            ("warehouse.path", "s3://icebergdata/demo"),
2648            ("database.name", "s1"),
2649            ("table.name", "t1"),
2650        ]
2651        .into_iter()
2652        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2653        .collect();
2654
2655        test_create_catalog(values).await;
2656    }
2657
2658    #[test]
2659    fn test_config_constants_consistency() {
2660        // This test ensures our constants match the expected configuration names
2661        // If you change a constant, this test will remind you to update both places
2662        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2663        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2664        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2665        assert_eq!(WRITE_MODE, "write_mode");
2666        assert_eq!(
2667            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2668            "snapshot_expiration_retain_last"
2669        );
2670        assert_eq!(
2671            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2672            "snapshot_expiration_max_age_millis"
2673        );
2674        assert_eq!(
2675            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2676            "snapshot_expiration_clear_expired_files"
2677        );
2678        assert_eq!(
2679            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2680            "snapshot_expiration_clear_expired_meta_data"
2681        );
2682        assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2683    }
2684}