risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103// Configuration constants
104pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112    "snapshot_expiration_clear_expired_meta_data";
113
114fn default_commit_retry_num() -> u32 {
115    8
116}
117
118fn default_iceberg_write_mode() -> String {
119    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
120}
121
122fn default_true() -> bool {
123    true
124}
125
126#[serde_as]
127#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
128pub struct IcebergConfig {
129    pub r#type: String, // accept "append-only" or "upsert"
130
131    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
132    pub force_append_only: bool,
133
134    #[serde(flatten)]
135    common: IcebergCommon,
136
137    #[serde(
138        rename = "primary_key",
139        default,
140        deserialize_with = "deserialize_optional_string_seq_from_string"
141    )]
142    pub primary_key: Option<Vec<String>>,
143
144    // Props for java catalog props.
145    #[serde(skip)]
146    pub java_catalog_props: HashMap<String, String>,
147
148    #[serde(default)]
149    pub partition_by: Option<String>,
150
151    /// Commit every n(>0) checkpoints, default is 10.
152    #[serde(default = "default_commit_checkpoint_interval")]
153    #[serde_as(as = "DisplayFromStr")]
154    #[with_option(allow_alter_on_fly)]
155    pub commit_checkpoint_interval: u64,
156
157    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
158    pub create_table_if_not_exists: bool,
159
160    /// Whether it is `exactly_once`, the default is not.
161    #[serde(default)]
162    #[serde_as(as = "Option<DisplayFromStr>")]
163    pub is_exactly_once: Option<bool>,
164    // Retry commit num when iceberg commit fail. default is 8.
165    // # TODO
166    // Iceberg table may store the retry commit num in table meta.
167    // We should try to find and use that as default commit retry num first.
168    #[serde(default = "default_commit_retry_num")]
169    pub commit_retry_num: u32,
170
171    /// Whether to enable iceberg compaction.
172    #[serde(
173        rename = "enable_compaction",
174        default,
175        deserialize_with = "deserialize_bool_from_string"
176    )]
177    #[with_option(allow_alter_on_fly)]
178    pub enable_compaction: bool,
179
180    /// The interval of iceberg compaction
181    #[serde(rename = "compaction_interval_sec", default)]
182    #[serde_as(as = "Option<DisplayFromStr>")]
183    #[with_option(allow_alter_on_fly)]
184    pub compaction_interval_sec: Option<u64>,
185
186    /// Whether to enable iceberg expired snapshots.
187    #[serde(
188        rename = "enable_snapshot_expiration",
189        default,
190        deserialize_with = "deserialize_bool_from_string"
191    )]
192    #[with_option(allow_alter_on_fly)]
193    pub enable_snapshot_expiration: bool,
194
195    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
196    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
197    pub write_mode: String,
198
199    /// The maximum age (in milliseconds) for snapshots before they expire
200    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
201    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
202    #[serde_as(as = "Option<DisplayFromStr>")]
203    #[with_option(allow_alter_on_fly)]
204    pub snapshot_expiration_max_age_millis: Option<i64>,
205
206    /// The number of snapshots to retain
207    #[serde(rename = "snapshot_expiration_retain_last", default)]
208    #[serde_as(as = "Option<DisplayFromStr>")]
209    #[with_option(allow_alter_on_fly)]
210    pub snapshot_expiration_retain_last: Option<i32>,
211
212    #[serde(
213        rename = "snapshot_expiration_clear_expired_files",
214        default = "default_true",
215        deserialize_with = "deserialize_bool_from_string"
216    )]
217    #[with_option(allow_alter_on_fly)]
218    pub snapshot_expiration_clear_expired_files: bool,
219
220    #[serde(
221        rename = "snapshot_expiration_clear_expired_meta_data",
222        default = "default_true",
223        deserialize_with = "deserialize_bool_from_string"
224    )]
225    #[with_option(allow_alter_on_fly)]
226    pub snapshot_expiration_clear_expired_meta_data: bool,
227}
228
229impl EnforceSecret for IcebergConfig {
230    fn enforce_secret<'a>(
231        prop_iter: impl Iterator<Item = &'a str>,
232    ) -> crate::error::ConnectorResult<()> {
233        for prop in prop_iter {
234            IcebergCommon::enforce_one(prop)?;
235        }
236        Ok(())
237    }
238
239    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
240        IcebergCommon::enforce_one(prop)
241    }
242}
243
244impl IcebergConfig {
245    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
246        let mut config =
247            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
248                .map_err(|e| SinkError::Config(anyhow!(e)))?;
249
250        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
251            return Err(SinkError::Config(anyhow!(
252                "`{}` must be {}, or {}",
253                SINK_TYPE_OPTION,
254                SINK_TYPE_APPEND_ONLY,
255                SINK_TYPE_UPSERT
256            )));
257        }
258
259        if config.r#type == SINK_TYPE_UPSERT {
260            if let Some(primary_key) = &config.primary_key {
261                if primary_key.is_empty() {
262                    return Err(SinkError::Config(anyhow!(
263                        "`primary_key` must not be empty in {}",
264                        SINK_TYPE_UPSERT
265                    )));
266                }
267            } else {
268                return Err(SinkError::Config(anyhow!(
269                    "Must set `primary_key` in {}",
270                    SINK_TYPE_UPSERT
271                )));
272            }
273        }
274
275        // All configs start with "catalog." will be treated as java configs.
276        config.java_catalog_props = values
277            .iter()
278            .filter(|(k, _v)| {
279                k.starts_with("catalog.")
280                    && k != &"catalog.uri"
281                    && k != &"catalog.type"
282                    && k != &"catalog.name"
283                    && k != &"catalog.header"
284            })
285            .map(|(k, v)| (k[8..].to_string(), v.clone()))
286            .collect();
287
288        if config.commit_checkpoint_interval == 0 {
289            return Err(SinkError::Config(anyhow!(
290                "`commit_checkpoint_interval` must be greater than 0"
291            )));
292        }
293
294        Ok(config)
295    }
296
297    pub fn catalog_type(&self) -> &str {
298        self.common.catalog_type()
299    }
300
301    pub async fn load_table(&self) -> Result<Table> {
302        self.common
303            .load_table(&self.java_catalog_props)
304            .await
305            .map_err(Into::into)
306    }
307
308    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
309        self.common
310            .create_catalog(&self.java_catalog_props)
311            .await
312            .map_err(Into::into)
313    }
314
315    pub fn full_table_name(&self) -> Result<TableIdent> {
316        self.common.full_table_name().map_err(Into::into)
317    }
318
319    pub fn catalog_name(&self) -> String {
320        self.common.catalog_name()
321    }
322
323    pub fn compaction_interval_sec(&self) -> u64 {
324        // default to 1 hour
325        self.compaction_interval_sec.unwrap_or(3600)
326    }
327
328    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
329    /// Returns `current_time_ms` - `max_age_millis`
330    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
331        self.snapshot_expiration_max_age_millis
332            .map(|max_age_millis| current_time_ms - max_age_millis)
333    }
334}
335
336pub struct IcebergSink {
337    config: IcebergConfig,
338    param: SinkParam,
339    // In upsert mode, it never be None and empty.
340    unique_column_ids: Option<Vec<usize>>,
341}
342
343impl EnforceSecret for IcebergSink {
344    fn enforce_secret<'a>(
345        prop_iter: impl Iterator<Item = &'a str>,
346    ) -> crate::error::ConnectorResult<()> {
347        for prop in prop_iter {
348            IcebergConfig::enforce_one(prop)?;
349        }
350        Ok(())
351    }
352}
353
354impl TryFrom<SinkParam> for IcebergSink {
355    type Error = SinkError;
356
357    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
358        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
359        IcebergSink::new(config, param)
360    }
361}
362
363impl Debug for IcebergSink {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        f.debug_struct("IcebergSink")
366            .field("config", &self.config)
367            .finish()
368    }
369}
370
371impl IcebergSink {
372    async fn create_and_validate_table(&self) -> Result<Table> {
373        if self.config.create_table_if_not_exists {
374            self.create_table_if_not_exists().await?;
375        }
376
377        let table = self
378            .config
379            .load_table()
380            .await
381            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
382
383        let sink_schema = self.param.schema();
384        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
385            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
386
387        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
388            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
389
390        Ok(table)
391    }
392
393    async fn create_table_if_not_exists(&self) -> Result<()> {
394        let catalog = self.config.create_catalog().await?;
395        let namespace = if let Some(database_name) = &self.config.common.database_name {
396            let namespace = NamespaceIdent::new(database_name.clone());
397            if !catalog
398                .namespace_exists(&namespace)
399                .await
400                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
401            {
402                catalog
403                    .create_namespace(&namespace, HashMap::default())
404                    .await
405                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
406                    .context("failed to create iceberg namespace")?;
407            }
408            namespace
409        } else {
410            bail!("database name must be set if you want to create table")
411        };
412
413        let table_id = self
414            .config
415            .full_table_name()
416            .context("Unable to parse table name")?;
417        if !catalog
418            .table_exists(&table_id)
419            .await
420            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
421        {
422            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
423            // convert risingwave schema -> arrow schema -> iceberg schema
424            let arrow_fields = self
425                .param
426                .columns
427                .iter()
428                .map(|column| {
429                    Ok(iceberg_create_table_arrow_convert
430                        .to_arrow_field(&column.name, &column.data_type)
431                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
432                        .context(format!(
433                            "failed to convert {}: {} to arrow type",
434                            &column.name, &column.data_type
435                        ))?)
436                })
437                .collect::<Result<Vec<ArrowField>>>()?;
438            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
439            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
440                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
441                .context("failed to convert arrow schema to iceberg schema")?;
442
443            let location = {
444                let mut names = namespace.clone().inner();
445                names.push(self.config.common.table_name.clone());
446                match &self.config.common.warehouse_path {
447                    Some(warehouse_path) => {
448                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
449                        let url = Url::parse(warehouse_path);
450                        if url.is_err() || is_s3_tables {
451                            // For rest catalog, the warehouse_path could be a warehouse name.
452                            // In this case, we should specify the location when creating a table.
453                            if self.config.common.catalog_type() == "rest"
454                                || self.config.common.catalog_type() == "rest_rust"
455                            {
456                                None
457                            } else {
458                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
459                            }
460                        } else if warehouse_path.ends_with('/') {
461                            Some(format!("{}{}", warehouse_path, names.join("/")))
462                        } else {
463                            Some(format!("{}/{}", warehouse_path, names.join("/")))
464                        }
465                    }
466                    None => None,
467                }
468            };
469
470            let partition_spec = match &self.config.partition_by {
471                Some(partition_by) => {
472                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
473                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
474                        .into_iter()
475                        .enumerate()
476                    {
477                        match iceberg_schema.field_id_by_name(&column) {
478                            Some(id) => partition_fields.push(
479                                UnboundPartitionField::builder()
480                                    .source_id(id)
481                                    .transform(transform)
482                                    .name(format!("_p_{}", column))
483                                    .field_id(i as i32)
484                                    .build(),
485                            ),
486                            None => bail!(format!(
487                                "Partition source column does not exist in schema: {}",
488                                column
489                            )),
490                        };
491                    }
492                    Some(
493                        UnboundPartitionSpec::builder()
494                            .with_spec_id(0)
495                            .add_partition_fields(partition_fields)
496                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
497                            .context("failed to add partition columns")?
498                            .build(),
499                    )
500                }
501                None => None,
502            };
503
504            let table_creation_builder = TableCreation::builder()
505                .name(self.config.common.table_name.clone())
506                .schema(iceberg_schema);
507
508            let table_creation = match (location, partition_spec) {
509                (Some(location), Some(partition_spec)) => table_creation_builder
510                    .location(location)
511                    .partition_spec(partition_spec)
512                    .build(),
513                (Some(location), None) => table_creation_builder.location(location).build(),
514                (None, Some(partition_spec)) => table_creation_builder
515                    .partition_spec(partition_spec)
516                    .build(),
517                (None, None) => table_creation_builder.build(),
518            };
519
520            catalog
521                .create_table(&namespace, table_creation)
522                .await
523                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
524                .context("failed to create iceberg table")?;
525        }
526        Ok(())
527    }
528
529    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
530        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
531            if let Some(pk) = &config.primary_key {
532                let mut unique_column_ids = Vec::with_capacity(pk.len());
533                for col_name in pk {
534                    let id = param
535                        .columns
536                        .iter()
537                        .find(|col| col.name.as_str() == col_name)
538                        .ok_or_else(|| {
539                            SinkError::Config(anyhow!(
540                                "Primary key column {} not found in sink schema",
541                                col_name
542                            ))
543                        })?
544                        .column_id
545                        .get_id() as usize;
546                    unique_column_ids.push(id);
547                }
548                Some(unique_column_ids)
549            } else {
550                unreachable!()
551            }
552        } else {
553            None
554        };
555        Ok(Self {
556            config,
557            param,
558            unique_column_ids,
559        })
560    }
561}
562
563impl Sink for IcebergSink {
564    type Coordinator = IcebergSinkCommitter;
565    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
566
567    const SINK_NAME: &'static str = ICEBERG_SINK;
568
569    async fn validate(&self) -> Result<()> {
570        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
571            bail!("Snowflake catalog only supports iceberg sources");
572        }
573        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
574            risingwave_common::license::Feature::IcebergSinkWithGlue
575                .check_available()
576                .map_err(|e| anyhow::anyhow!(e))?;
577        }
578
579        let _ = self.create_and_validate_table().await?;
580        Ok(())
581    }
582
583    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
584        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
585
586        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
587            if iceberg_config.enable_compaction && compaction_interval == 0 {
588                bail!(
589                    "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
590                );
591            } else {
592                // log compaction_interval
593                tracing::info!(
594                    "Alter config compaction_interval set to {} seconds",
595                    compaction_interval
596                );
597            }
598        }
599
600        Ok(())
601    }
602
603    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
604        let table = self.create_and_validate_table().await?;
605        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
606            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
607        } else {
608            IcebergSinkWriter::new_append_only(table, &writer_param).await?
609        };
610
611        let commit_checkpoint_interval =
612            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
613                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
614            );
615        let writer = CoordinatedLogSinker::new(
616            &writer_param,
617            self.param.clone(),
618            inner,
619            commit_checkpoint_interval,
620        )
621        .await?;
622
623        Ok(writer)
624    }
625
626    fn is_coordinated_sink(&self) -> bool {
627        true
628    }
629
630    async fn new_coordinator(
631        &self,
632        db: DatabaseConnection,
633        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
634    ) -> Result<Self::Coordinator> {
635        let catalog = self.config.create_catalog().await?;
636        let table = self.create_and_validate_table().await?;
637        Ok(IcebergSinkCommitter {
638            catalog,
639            table,
640            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
641            last_commit_epoch: 0,
642            sink_id: self.param.sink_id.sink_id(),
643            config: self.config.clone(),
644            param: self.param.clone(),
645            db,
646            commit_retry_num: self.config.commit_retry_num,
647            committed_epoch_subscriber: None,
648            iceberg_compact_stat_sender,
649        })
650    }
651}
652
653/// None means no project.
654/// Prepare represent the extra partition column idx.
655/// Done represents the project idx vec.
656///
657/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
658/// to create the project idx vec.
659enum ProjectIdxVec {
660    None,
661    Prepare(usize),
662    Done(Vec<usize>),
663}
664
665pub struct IcebergSinkWriter {
666    writer: IcebergWriterDispatch,
667    arrow_schema: SchemaRef,
668    // See comments below
669    metrics: IcebergWriterMetrics,
670    // State of iceberg table for this writer
671    table: Table,
672    // For chunk with extra partition column, we should remove this column before write.
673    // This project index vec is used to avoid create project idx each time.
674    project_idx_vec: ProjectIdxVec,
675}
676
677#[allow(clippy::type_complexity)]
678enum IcebergWriterDispatch {
679    PartitionAppendOnly {
680        writer: Option<Box<dyn IcebergWriter>>,
681        writer_builder: MonitoredGeneralWriterBuilder<
682            FanoutPartitionWriterBuilder<
683                DataFileWriterBuilder<
684                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
685                >,
686            >,
687        >,
688    },
689    NonpartitionAppendOnly {
690        writer: Option<Box<dyn IcebergWriter>>,
691        writer_builder: MonitoredGeneralWriterBuilder<
692            DataFileWriterBuilder<
693                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
694            >,
695        >,
696    },
697    PartitionUpsert {
698        writer: Option<Box<dyn IcebergWriter>>,
699        writer_builder: MonitoredGeneralWriterBuilder<
700            FanoutPartitionWriterBuilder<
701                EqualityDeltaWriterBuilder<
702                    DataFileWriterBuilder<
703                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
704                    >,
705                    MonitoredPositionDeleteWriterBuilder<
706                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
707                    >,
708                    EqualityDeleteFileWriterBuilder<
709                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
710                    >,
711                >,
712            >,
713        >,
714        arrow_schema_with_op_column: SchemaRef,
715    },
716    NonpartitionUpsert {
717        writer: Option<Box<dyn IcebergWriter>>,
718        writer_builder: MonitoredGeneralWriterBuilder<
719            EqualityDeltaWriterBuilder<
720                DataFileWriterBuilder<
721                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
722                >,
723                MonitoredPositionDeleteWriterBuilder<
724                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
725                >,
726                EqualityDeleteFileWriterBuilder<
727                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
728                >,
729            >,
730        >,
731        arrow_schema_with_op_column: SchemaRef,
732    },
733}
734
735impl IcebergWriterDispatch {
736    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
737        match self {
738            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
739            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
740            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
741            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
742        }
743    }
744}
745
746pub struct IcebergWriterMetrics {
747    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
748    // They are actually used in `PrometheusWriterBuilder`:
749    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
750    // We keep them here to let the guard cleans the labels from metrics registry when dropped
751    _write_qps: LabelGuardedIntCounter,
752    _write_latency: LabelGuardedHistogram,
753    write_bytes: LabelGuardedIntCounter,
754}
755
756impl IcebergSinkWriter {
757    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
758        let SinkWriterParam {
759            extra_partition_col_idx,
760            actor_id,
761            sink_id,
762            sink_name,
763            ..
764        } = writer_param;
765        let metrics_labels = [
766            &actor_id.to_string(),
767            &sink_id.to_string(),
768            sink_name.as_str(),
769        ];
770
771        // Metrics
772        let write_qps = GLOBAL_SINK_METRICS
773            .iceberg_write_qps
774            .with_guarded_label_values(&metrics_labels);
775        let write_latency = GLOBAL_SINK_METRICS
776            .iceberg_write_latency
777            .with_guarded_label_values(&metrics_labels);
778        // # TODO
779        // Unused. Add this metrics later.
780        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
781            .iceberg_rolling_unflushed_data_file
782            .with_guarded_label_values(&metrics_labels);
783        let write_bytes = GLOBAL_SINK_METRICS
784            .iceberg_write_bytes
785            .with_guarded_label_values(&metrics_labels);
786
787        let schema = table.metadata().current_schema();
788        let partition_spec = table.metadata().default_partition_spec();
789
790        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
791        let unique_uuid_suffix = Uuid::now_v7();
792
793        let parquet_writer_properties = WriterProperties::builder()
794            .set_max_row_group_size(
795                writer_param
796                    .streaming_config
797                    .developer
798                    .iceberg_sink_write_parquet_max_row_group_rows,
799            )
800            .build();
801
802        let parquet_writer_builder = ParquetWriterBuilder::new(
803            parquet_writer_properties,
804            schema.clone(),
805            table.file_io().clone(),
806            DefaultLocationGenerator::new(table.metadata().clone())
807                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
808            DefaultFileNameGenerator::new(
809                writer_param.actor_id.to_string(),
810                Some(unique_uuid_suffix.to_string()),
811                iceberg::spec::DataFileFormat::Parquet,
812            ),
813        );
814        let data_file_builder =
815            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
816        if partition_spec.fields().is_empty() {
817            let writer_builder = MonitoredGeneralWriterBuilder::new(
818                data_file_builder,
819                write_qps.clone(),
820                write_latency.clone(),
821            );
822            let inner_writer = Some(Box::new(
823                writer_builder
824                    .clone()
825                    .build()
826                    .await
827                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
828            ) as Box<dyn IcebergWriter>);
829            Ok(Self {
830                arrow_schema: Arc::new(
831                    schema_to_arrow_schema(table.metadata().current_schema())
832                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
833                ),
834                metrics: IcebergWriterMetrics {
835                    _write_qps: write_qps,
836                    _write_latency: write_latency,
837                    write_bytes,
838                },
839                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
840                    writer: inner_writer,
841                    writer_builder,
842                },
843                table,
844                project_idx_vec: {
845                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
846                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
847                    } else {
848                        ProjectIdxVec::None
849                    }
850                },
851            })
852        } else {
853            let partition_builder = MonitoredGeneralWriterBuilder::new(
854                FanoutPartitionWriterBuilder::new(
855                    data_file_builder,
856                    partition_spec.clone(),
857                    schema.clone(),
858                )
859                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
860                write_qps.clone(),
861                write_latency.clone(),
862            );
863            let inner_writer = Some(Box::new(
864                partition_builder
865                    .clone()
866                    .build()
867                    .await
868                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
869            ) as Box<dyn IcebergWriter>);
870            Ok(Self {
871                arrow_schema: Arc::new(
872                    schema_to_arrow_schema(table.metadata().current_schema())
873                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
874                ),
875                metrics: IcebergWriterMetrics {
876                    _write_qps: write_qps,
877                    _write_latency: write_latency,
878                    write_bytes,
879                },
880                writer: IcebergWriterDispatch::PartitionAppendOnly {
881                    writer: inner_writer,
882                    writer_builder: partition_builder,
883                },
884                table,
885                project_idx_vec: {
886                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
887                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
888                    } else {
889                        ProjectIdxVec::None
890                    }
891                },
892            })
893        }
894    }
895
896    pub async fn new_upsert(
897        table: Table,
898        unique_column_ids: Vec<usize>,
899        writer_param: &SinkWriterParam,
900    ) -> Result<Self> {
901        let SinkWriterParam {
902            extra_partition_col_idx,
903            actor_id,
904            sink_id,
905            sink_name,
906            ..
907        } = writer_param;
908        let metrics_labels = [
909            &actor_id.to_string(),
910            &sink_id.to_string(),
911            sink_name.as_str(),
912        ];
913        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
914
915        // Metrics
916        let write_qps = GLOBAL_SINK_METRICS
917            .iceberg_write_qps
918            .with_guarded_label_values(&metrics_labels);
919        let write_latency = GLOBAL_SINK_METRICS
920            .iceberg_write_latency
921            .with_guarded_label_values(&metrics_labels);
922        // # TODO
923        // Unused. Add this metrics later.
924        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
925            .iceberg_rolling_unflushed_data_file
926            .with_guarded_label_values(&metrics_labels);
927        let position_delete_cache_num = GLOBAL_SINK_METRICS
928            .iceberg_position_delete_cache_num
929            .with_guarded_label_values(&metrics_labels);
930        let write_bytes = GLOBAL_SINK_METRICS
931            .iceberg_write_bytes
932            .with_guarded_label_values(&metrics_labels);
933
934        // Determine the schema id and partition spec id
935        let schema = table.metadata().current_schema();
936        let partition_spec = table.metadata().default_partition_spec();
937
938        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
939        let unique_uuid_suffix = Uuid::now_v7();
940
941        let parquet_writer_properties = WriterProperties::builder()
942            .set_max_row_group_size(
943                writer_param
944                    .streaming_config
945                    .developer
946                    .iceberg_sink_write_parquet_max_row_group_rows,
947            )
948            .build();
949
950        let data_file_builder = {
951            let parquet_writer_builder = ParquetWriterBuilder::new(
952                parquet_writer_properties.clone(),
953                schema.clone(),
954                table.file_io().clone(),
955                DefaultLocationGenerator::new(table.metadata().clone())
956                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
957                DefaultFileNameGenerator::new(
958                    writer_param.actor_id.to_string(),
959                    Some(unique_uuid_suffix.to_string()),
960                    iceberg::spec::DataFileFormat::Parquet,
961                ),
962            );
963            DataFileWriterBuilder::new(
964                parquet_writer_builder.clone(),
965                None,
966                partition_spec.spec_id(),
967            )
968        };
969        let position_delete_builder = {
970            let parquet_writer_builder = ParquetWriterBuilder::new(
971                parquet_writer_properties.clone(),
972                POSITION_DELETE_SCHEMA.clone(),
973                table.file_io().clone(),
974                DefaultLocationGenerator::new(table.metadata().clone())
975                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
976                DefaultFileNameGenerator::new(
977                    writer_param.actor_id.to_string(),
978                    Some(format!("pos-del-{}", unique_uuid_suffix)),
979                    iceberg::spec::DataFileFormat::Parquet,
980                ),
981            );
982            MonitoredPositionDeleteWriterBuilder::new(
983                SortPositionDeleteWriterBuilder::new(
984                    parquet_writer_builder.clone(),
985                    writer_param
986                        .streaming_config
987                        .developer
988                        .iceberg_sink_positional_delete_cache_size,
989                    None,
990                    None,
991                ),
992                position_delete_cache_num,
993            )
994        };
995        let equality_delete_builder = {
996            let config = EqualityDeleteWriterConfig::new(
997                unique_column_ids.clone(),
998                table.metadata().current_schema().clone(),
999                None,
1000                partition_spec.spec_id(),
1001            )
1002            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1003            let parquet_writer_builder = ParquetWriterBuilder::new(
1004                parquet_writer_properties.clone(),
1005                Arc::new(
1006                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1007                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1008                ),
1009                table.file_io().clone(),
1010                DefaultLocationGenerator::new(table.metadata().clone())
1011                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1012                DefaultFileNameGenerator::new(
1013                    writer_param.actor_id.to_string(),
1014                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1015                    iceberg::spec::DataFileFormat::Parquet,
1016                ),
1017            );
1018
1019            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
1020        };
1021        let delta_builder = EqualityDeltaWriterBuilder::new(
1022            data_file_builder,
1023            position_delete_builder,
1024            equality_delete_builder,
1025            unique_column_ids,
1026        );
1027        if partition_spec.fields().is_empty() {
1028            let writer_builder = MonitoredGeneralWriterBuilder::new(
1029                delta_builder,
1030                write_qps.clone(),
1031                write_latency.clone(),
1032            );
1033            let inner_writer = Some(Box::new(
1034                writer_builder
1035                    .clone()
1036                    .build()
1037                    .await
1038                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1039            ) as Box<dyn IcebergWriter>);
1040            let original_arrow_schema = Arc::new(
1041                schema_to_arrow_schema(table.metadata().current_schema())
1042                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1043            );
1044            let schema_with_extra_op_column = {
1045                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1046                new_fields.push(Arc::new(ArrowField::new(
1047                    "op".to_owned(),
1048                    ArrowDataType::Int32,
1049                    false,
1050                )));
1051                Arc::new(ArrowSchema::new(new_fields))
1052            };
1053            Ok(Self {
1054                arrow_schema: original_arrow_schema,
1055                metrics: IcebergWriterMetrics {
1056                    _write_qps: write_qps,
1057                    _write_latency: write_latency,
1058                    write_bytes,
1059                },
1060                table,
1061                writer: IcebergWriterDispatch::NonpartitionUpsert {
1062                    writer: inner_writer,
1063                    writer_builder,
1064                    arrow_schema_with_op_column: schema_with_extra_op_column,
1065                },
1066                project_idx_vec: {
1067                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1068                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1069                    } else {
1070                        ProjectIdxVec::None
1071                    }
1072                },
1073            })
1074        } else {
1075            let original_arrow_schema = Arc::new(
1076                schema_to_arrow_schema(table.metadata().current_schema())
1077                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1078            );
1079            let schema_with_extra_op_column = {
1080                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1081                new_fields.push(Arc::new(ArrowField::new(
1082                    "op".to_owned(),
1083                    ArrowDataType::Int32,
1084                    false,
1085                )));
1086                Arc::new(ArrowSchema::new(new_fields))
1087            };
1088            let partition_builder = MonitoredGeneralWriterBuilder::new(
1089                FanoutPartitionWriterBuilder::new_with_custom_schema(
1090                    delta_builder,
1091                    schema_with_extra_op_column.clone(),
1092                    partition_spec.clone(),
1093                    table.metadata().current_schema().clone(),
1094                ),
1095                write_qps.clone(),
1096                write_latency.clone(),
1097            );
1098            let inner_writer = Some(Box::new(
1099                partition_builder
1100                    .clone()
1101                    .build()
1102                    .await
1103                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104            ) as Box<dyn IcebergWriter>);
1105            Ok(Self {
1106                arrow_schema: original_arrow_schema,
1107                metrics: IcebergWriterMetrics {
1108                    _write_qps: write_qps,
1109                    _write_latency: write_latency,
1110                    write_bytes,
1111                },
1112                table,
1113                writer: IcebergWriterDispatch::PartitionUpsert {
1114                    writer: inner_writer,
1115                    writer_builder: partition_builder,
1116                    arrow_schema_with_op_column: schema_with_extra_op_column,
1117                },
1118                project_idx_vec: {
1119                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1120                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1121                    } else {
1122                        ProjectIdxVec::None
1123                    }
1124                },
1125            })
1126        }
1127    }
1128}
1129
1130#[async_trait]
1131impl SinkWriter for IcebergSinkWriter {
1132    type CommitMetadata = Option<SinkMetadata>;
1133
1134    /// Begin a new epoch
1135    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1136        // Just skip it.
1137        Ok(())
1138    }
1139
1140    /// Write a stream chunk to sink
1141    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1142        // Try to build writer if it's None.
1143        match &mut self.writer {
1144            IcebergWriterDispatch::PartitionAppendOnly {
1145                writer,
1146                writer_builder,
1147            } => {
1148                if writer.is_none() {
1149                    *writer = Some(Box::new(
1150                        writer_builder
1151                            .clone()
1152                            .build()
1153                            .await
1154                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1155                    ));
1156                }
1157            }
1158            IcebergWriterDispatch::NonpartitionAppendOnly {
1159                writer,
1160                writer_builder,
1161            } => {
1162                if writer.is_none() {
1163                    *writer = Some(Box::new(
1164                        writer_builder
1165                            .clone()
1166                            .build()
1167                            .await
1168                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1169                    ));
1170                }
1171            }
1172            IcebergWriterDispatch::PartitionUpsert {
1173                writer,
1174                writer_builder,
1175                ..
1176            } => {
1177                if writer.is_none() {
1178                    *writer = Some(Box::new(
1179                        writer_builder
1180                            .clone()
1181                            .build()
1182                            .await
1183                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1184                    ));
1185                }
1186            }
1187            IcebergWriterDispatch::NonpartitionUpsert {
1188                writer,
1189                writer_builder,
1190                ..
1191            } => {
1192                if writer.is_none() {
1193                    *writer = Some(Box::new(
1194                        writer_builder
1195                            .clone()
1196                            .build()
1197                            .await
1198                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1199                    ));
1200                }
1201            }
1202        };
1203
1204        // Process the chunk.
1205        let (mut chunk, ops) = chunk.compact().into_parts();
1206        match &self.project_idx_vec {
1207            ProjectIdxVec::None => {}
1208            ProjectIdxVec::Prepare(idx) => {
1209                if *idx >= chunk.columns().len() {
1210                    return Err(SinkError::Iceberg(anyhow!(
1211                        "invalid extra partition column index {}",
1212                        idx
1213                    )));
1214                }
1215                let project_idx_vec = (0..*idx)
1216                    .chain(*idx + 1..chunk.columns().len())
1217                    .collect_vec();
1218                chunk = chunk.project(&project_idx_vec);
1219                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1220            }
1221            ProjectIdxVec::Done(idx_vec) => {
1222                chunk = chunk.project(idx_vec);
1223            }
1224        }
1225        if ops.is_empty() {
1226            return Ok(());
1227        }
1228        let write_batch_size = chunk.estimated_heap_size();
1229        let batch = match &self.writer {
1230            IcebergWriterDispatch::PartitionAppendOnly { .. }
1231            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1232                // separate out insert chunk
1233                let filters =
1234                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1235                chunk.set_visibility(filters);
1236                IcebergArrowConvert
1237                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1238                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1239            }
1240            IcebergWriterDispatch::PartitionUpsert {
1241                arrow_schema_with_op_column,
1242                ..
1243            }
1244            | IcebergWriterDispatch::NonpartitionUpsert {
1245                arrow_schema_with_op_column,
1246                ..
1247            } => {
1248                let chunk = IcebergArrowConvert
1249                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1250                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1251                let ops = Arc::new(Int32Array::from(
1252                    ops.iter()
1253                        .map(|op| match op {
1254                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1255                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1256                        })
1257                        .collect_vec(),
1258                ));
1259                let mut columns = chunk.columns().to_vec();
1260                columns.push(ops);
1261                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1262                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1263            }
1264        };
1265
1266        let writer = self.writer.get_writer().unwrap();
1267        writer
1268            .write(batch)
1269            .instrument_await("iceberg_write")
1270            .await
1271            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1272        self.metrics.write_bytes.inc_by(write_batch_size as _);
1273        Ok(())
1274    }
1275
1276    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1277    /// writer should commit the current epoch.
1278    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1279        // Skip it if not checkpoint
1280        if !is_checkpoint {
1281            return Ok(None);
1282        }
1283
1284        let close_result = match &mut self.writer {
1285            IcebergWriterDispatch::PartitionAppendOnly {
1286                writer,
1287                writer_builder,
1288            } => {
1289                let close_result = match writer.take() {
1290                    Some(mut writer) => {
1291                        Some(writer.close().instrument_await("iceberg_close").await)
1292                    }
1293                    _ => None,
1294                };
1295                match writer_builder.clone().build().await {
1296                    Ok(new_writer) => {
1297                        *writer = Some(Box::new(new_writer));
1298                    }
1299                    _ => {
1300                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1301                        // here because current writer may close successfully. So we just log the error.
1302                        warn!("Failed to build new writer after close");
1303                    }
1304                }
1305                close_result
1306            }
1307            IcebergWriterDispatch::NonpartitionAppendOnly {
1308                writer,
1309                writer_builder,
1310            } => {
1311                let close_result = match writer.take() {
1312                    Some(mut writer) => {
1313                        Some(writer.close().instrument_await("iceberg_close").await)
1314                    }
1315                    _ => None,
1316                };
1317                match writer_builder.clone().build().await {
1318                    Ok(new_writer) => {
1319                        *writer = Some(Box::new(new_writer));
1320                    }
1321                    _ => {
1322                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1323                        // here because current writer may close successfully. So we just log the error.
1324                        warn!("Failed to build new writer after close");
1325                    }
1326                }
1327                close_result
1328            }
1329            IcebergWriterDispatch::PartitionUpsert {
1330                writer,
1331                writer_builder,
1332                ..
1333            } => {
1334                let close_result = match writer.take() {
1335                    Some(mut writer) => {
1336                        Some(writer.close().instrument_await("iceberg_close").await)
1337                    }
1338                    _ => None,
1339                };
1340                match writer_builder.clone().build().await {
1341                    Ok(new_writer) => {
1342                        *writer = Some(Box::new(new_writer));
1343                    }
1344                    _ => {
1345                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1346                        // here because current writer may close successfully. So we just log the error.
1347                        warn!("Failed to build new writer after close");
1348                    }
1349                }
1350                close_result
1351            }
1352            IcebergWriterDispatch::NonpartitionUpsert {
1353                writer,
1354                writer_builder,
1355                ..
1356            } => {
1357                let close_result = match writer.take() {
1358                    Some(mut writer) => {
1359                        Some(writer.close().instrument_await("iceberg_close").await)
1360                    }
1361                    _ => None,
1362                };
1363                match writer_builder.clone().build().await {
1364                    Ok(new_writer) => {
1365                        *writer = Some(Box::new(new_writer));
1366                    }
1367                    _ => {
1368                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1369                        // here because current writer may close successfully. So we just log the error.
1370                        warn!("Failed to build new writer after close");
1371                    }
1372                }
1373                close_result
1374            }
1375        };
1376
1377        match close_result {
1378            Some(Ok(result)) => {
1379                let version = self.table.metadata().format_version() as u8;
1380                let partition_type = self.table.metadata().default_partition_type();
1381                let data_files = result
1382                    .into_iter()
1383                    .map(|f| {
1384                        SerializedDataFile::try_from(f, partition_type, version == 1)
1385                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1386                    })
1387                    .collect::<Result<Vec<_>>>()?;
1388                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1389                    data_files,
1390                    schema_id: self.table.metadata().current_schema_id(),
1391                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1392                })?))
1393            }
1394            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1395            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1396        }
1397    }
1398
1399    /// Clean up
1400    async fn abort(&mut self) -> Result<()> {
1401        // TODO: abort should clean up all the data written in this epoch.
1402        Ok(())
1403    }
1404}
1405
1406const SCHEMA_ID: &str = "schema_id";
1407const PARTITION_SPEC_ID: &str = "partition_spec_id";
1408const DATA_FILES: &str = "data_files";
1409
1410#[derive(Default, Clone)]
1411struct IcebergCommitResult {
1412    schema_id: i32,
1413    partition_spec_id: i32,
1414    data_files: Vec<SerializedDataFile>,
1415}
1416
1417impl IcebergCommitResult {
1418    fn try_from(value: &SinkMetadata) -> Result<Self> {
1419        if let Some(Serialized(v)) = &value.metadata {
1420            let mut values = if let serde_json::Value::Object(v) =
1421                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1422                    .context("Can't parse iceberg sink metadata")?
1423            {
1424                v
1425            } else {
1426                bail!("iceberg sink metadata should be an object");
1427            };
1428
1429            let schema_id;
1430            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1431                schema_id = value
1432                    .as_u64()
1433                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1434            } else {
1435                bail!("iceberg sink metadata should have schema_id");
1436            }
1437
1438            let partition_spec_id;
1439            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1440                partition_spec_id = value
1441                    .as_u64()
1442                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1443            } else {
1444                bail!("iceberg sink metadata should have partition_spec_id");
1445            }
1446
1447            let data_files: Vec<SerializedDataFile>;
1448            if let serde_json::Value::Array(values) = values
1449                .remove(DATA_FILES)
1450                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1451            {
1452                data_files = values
1453                    .into_iter()
1454                    .map(from_value::<SerializedDataFile>)
1455                    .collect::<std::result::Result<_, _>>()
1456                    .unwrap();
1457            } else {
1458                bail!("iceberg sink metadata should have data_files object");
1459            }
1460
1461            Ok(Self {
1462                schema_id: schema_id as i32,
1463                partition_spec_id: partition_spec_id as i32,
1464                data_files,
1465            })
1466        } else {
1467            bail!("Can't create iceberg sink write result from empty data!")
1468        }
1469    }
1470
1471    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1472        let mut values = if let serde_json::Value::Object(value) =
1473            serde_json::from_slice::<serde_json::Value>(&value)
1474                .context("Can't parse iceberg sink metadata")?
1475        {
1476            value
1477        } else {
1478            bail!("iceberg sink metadata should be an object");
1479        };
1480
1481        let schema_id;
1482        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1483            schema_id = value
1484                .as_u64()
1485                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1486        } else {
1487            bail!("iceberg sink metadata should have schema_id");
1488        }
1489
1490        let partition_spec_id;
1491        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1492            partition_spec_id = value
1493                .as_u64()
1494                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1495        } else {
1496            bail!("iceberg sink metadata should have partition_spec_id");
1497        }
1498
1499        let data_files: Vec<SerializedDataFile>;
1500        if let serde_json::Value::Array(values) = values
1501            .remove(DATA_FILES)
1502            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1503        {
1504            data_files = values
1505                .into_iter()
1506                .map(from_value::<SerializedDataFile>)
1507                .collect::<std::result::Result<_, _>>()
1508                .unwrap();
1509        } else {
1510            bail!("iceberg sink metadata should have data_files object");
1511        }
1512
1513        Ok(Self {
1514            schema_id: schema_id as i32,
1515            partition_spec_id: partition_spec_id as i32,
1516            data_files,
1517        })
1518    }
1519}
1520
1521impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1522    type Error = SinkError;
1523
1524    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1525        let json_data_files = serde_json::Value::Array(
1526            value
1527                .data_files
1528                .iter()
1529                .map(serde_json::to_value)
1530                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1531                .context("Can't serialize data files to json")?,
1532        );
1533        let json_value = serde_json::Value::Object(
1534            vec![
1535                (
1536                    SCHEMA_ID.to_owned(),
1537                    serde_json::Value::Number(value.schema_id.into()),
1538                ),
1539                (
1540                    PARTITION_SPEC_ID.to_owned(),
1541                    serde_json::Value::Number(value.partition_spec_id.into()),
1542                ),
1543                (DATA_FILES.to_owned(), json_data_files),
1544            ]
1545            .into_iter()
1546            .collect(),
1547        );
1548        Ok(SinkMetadata {
1549            metadata: Some(Serialized(SerializedMetadata {
1550                metadata: serde_json::to_vec(&json_value)
1551                    .context("Can't serialize iceberg sink metadata")?,
1552            })),
1553        })
1554    }
1555}
1556
1557impl TryFrom<IcebergCommitResult> for Vec<u8> {
1558    type Error = SinkError;
1559
1560    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1561        let json_data_files = serde_json::Value::Array(
1562            value
1563                .data_files
1564                .iter()
1565                .map(serde_json::to_value)
1566                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1567                .context("Can't serialize data files to json")?,
1568        );
1569        let json_value = serde_json::Value::Object(
1570            vec![
1571                (
1572                    SCHEMA_ID.to_owned(),
1573                    serde_json::Value::Number(value.schema_id.into()),
1574                ),
1575                (
1576                    PARTITION_SPEC_ID.to_owned(),
1577                    serde_json::Value::Number(value.partition_spec_id.into()),
1578                ),
1579                (DATA_FILES.to_owned(), json_data_files),
1580            ]
1581            .into_iter()
1582            .collect(),
1583        );
1584        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1585    }
1586}
1587pub struct IcebergSinkCommitter {
1588    catalog: Arc<dyn Catalog>,
1589    table: Table,
1590    pub last_commit_epoch: u64,
1591    pub(crate) is_exactly_once: bool,
1592    pub(crate) sink_id: u32,
1593    pub(crate) config: IcebergConfig,
1594    pub(crate) param: SinkParam,
1595    pub(crate) db: DatabaseConnection,
1596    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1597    commit_retry_num: u32,
1598    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1599}
1600
1601impl IcebergSinkCommitter {
1602    // Reload table and guarantee current schema_id and partition_spec_id matches
1603    // given `schema_id` and `partition_spec_id`
1604    async fn reload_table(
1605        catalog: &dyn Catalog,
1606        table_ident: &TableIdent,
1607        schema_id: i32,
1608        partition_spec_id: i32,
1609    ) -> Result<Table> {
1610        let table = catalog
1611            .load_table(table_ident)
1612            .await
1613            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1614        if table.metadata().current_schema_id() != schema_id {
1615            return Err(SinkError::Iceberg(anyhow!(
1616                "Schema evolution not supported, expect schema id {}, but got {}",
1617                schema_id,
1618                table.metadata().current_schema_id()
1619            )));
1620        }
1621        if table.metadata().default_partition_spec_id() != partition_spec_id {
1622            return Err(SinkError::Iceberg(anyhow!(
1623                "Partition evolution not supported, expect partition spec id {}, but got {}",
1624                partition_spec_id,
1625                table.metadata().default_partition_spec_id()
1626            )));
1627        }
1628        Ok(table)
1629    }
1630}
1631
1632#[async_trait::async_trait]
1633impl SinkCommitCoordinator for IcebergSinkCommitter {
1634    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1635        if self.is_exactly_once {
1636            self.committed_epoch_subscriber = Some(subscriber);
1637            tracing::info!(
1638                "Sink id = {}: iceberg sink coordinator initing.",
1639                self.param.sink_id.sink_id()
1640            );
1641            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1642                let ordered_metadata_list_by_end_epoch =
1643                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1644
1645                let mut last_recommit_epoch = 0;
1646                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1647                    ordered_metadata_list_by_end_epoch
1648                {
1649                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1650                    let mut write_results = vec![];
1651
1652                    for each in write_results_bytes {
1653                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1654                        write_results.push(write_result);
1655                    }
1656
1657                    match (
1658                        committed,
1659                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1660                            .await?,
1661                    ) {
1662                        (true, _) => {
1663                            tracing::info!(
1664                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1665                                self.param.sink_id.sink_id()
1666                            );
1667                        }
1668                        (false, true) => {
1669                            // skip
1670                            tracing::info!(
1671                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1672                                self.param.sink_id.sink_id()
1673                            );
1674                            mark_row_is_committed_by_sink_id_and_end_epoch(
1675                                &self.db,
1676                                self.sink_id,
1677                                end_epoch,
1678                            )
1679                            .await?;
1680                        }
1681                        (false, false) => {
1682                            tracing::info!(
1683                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1684                                self.param.sink_id.sink_id()
1685                            );
1686                            self.re_commit(end_epoch, write_results, snapshot_id)
1687                                .await?;
1688                        }
1689                    }
1690
1691                    last_recommit_epoch = end_epoch;
1692                }
1693                tracing::info!(
1694                    "Sink id = {}: iceberg commit coordinator inited.",
1695                    self.param.sink_id.sink_id()
1696                );
1697                return Ok(Some(last_recommit_epoch));
1698            } else {
1699                tracing::info!(
1700                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1701                    self.param.sink_id.sink_id()
1702                );
1703                return Ok(None);
1704            }
1705        }
1706
1707        tracing::info!("Iceberg commit coordinator inited.");
1708        return Ok(None);
1709    }
1710
1711    async fn commit(
1712        &mut self,
1713        epoch: u64,
1714        metadata: Vec<SinkMetadata>,
1715        add_columns: Option<Vec<Field>>,
1716    ) -> Result<()> {
1717        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1718        if let Some(add_columns) = add_columns {
1719            return Err(anyhow!(
1720                "Iceberg sink not support add columns, but got: {:?}",
1721                add_columns
1722            )
1723            .into());
1724        }
1725
1726        let write_results: Vec<IcebergCommitResult> = metadata
1727            .iter()
1728            .map(IcebergCommitResult::try_from)
1729            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1730
1731        // Skip if no data to commit
1732        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1733            tracing::debug!(?epoch, "no data to commit");
1734            return Ok(());
1735        }
1736
1737        // guarantee that all write results has same schema_id and partition_spec_id
1738        if write_results
1739            .iter()
1740            .any(|r| r.schema_id != write_results[0].schema_id)
1741            || write_results
1742                .iter()
1743                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1744        {
1745            return Err(SinkError::Iceberg(anyhow!(
1746                "schema_id and partition_spec_id should be the same in all write results"
1747            )));
1748        }
1749
1750        if self.is_exactly_once {
1751            assert!(self.committed_epoch_subscriber.is_some());
1752            match self.committed_epoch_subscriber.clone() {
1753                Some(committed_epoch_subscriber) => {
1754                    // Get the latest committed_epoch and the receiver
1755                    let (committed_epoch, mut rw_futures_utilrx) =
1756                        committed_epoch_subscriber(self.param.sink_id).await?;
1757                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1758                    if committed_epoch >= epoch {
1759                        self.commit_iceberg_inner(epoch, write_results, None)
1760                            .await?;
1761                    } else {
1762                        tracing::info!(
1763                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1764                            committed_epoch,
1765                            epoch
1766                        );
1767                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1768                            tracing::info!(
1769                                "Received next committed epoch: {}",
1770                                next_committed_epoch
1771                            );
1772                            // If next_epoch meets the condition, execute commit immediately
1773                            if next_committed_epoch >= epoch {
1774                                self.commit_iceberg_inner(epoch, write_results, None)
1775                                    .await?;
1776                                break;
1777                            }
1778                        }
1779                    }
1780                }
1781                None => unreachable!(
1782                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1783                ),
1784            }
1785        } else {
1786            self.commit_iceberg_inner(epoch, write_results, None)
1787                .await?;
1788        }
1789
1790        Ok(())
1791    }
1792}
1793
1794/// Methods Required to Achieve Exactly Once Semantics
1795impl IcebergSinkCommitter {
1796    async fn re_commit(
1797        &mut self,
1798        epoch: u64,
1799        write_results: Vec<IcebergCommitResult>,
1800        snapshot_id: i64,
1801    ) -> Result<()> {
1802        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1803
1804        // Skip if no data to commit
1805        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1806            tracing::debug!(?epoch, "no data to commit");
1807            return Ok(());
1808        }
1809        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1810            .await?;
1811        Ok(())
1812    }
1813
1814    async fn commit_iceberg_inner(
1815        &mut self,
1816        epoch: u64,
1817        write_results: Vec<IcebergCommitResult>,
1818        snapshot_id: Option<i64>,
1819    ) -> Result<()> {
1820        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1821        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1822        // that was previously persisted in the system table to commit.
1823        let is_first_commit = snapshot_id.is_none();
1824        self.last_commit_epoch = epoch;
1825        let expect_schema_id = write_results[0].schema_id;
1826        let expect_partition_spec_id = write_results[0].partition_spec_id;
1827
1828        // Load the latest table to avoid concurrent modification with the best effort.
1829        self.table = Self::reload_table(
1830            self.catalog.as_ref(),
1831            self.table.identifier(),
1832            expect_schema_id,
1833            expect_partition_spec_id,
1834        )
1835        .await?;
1836        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1837            return Err(SinkError::Iceberg(anyhow!(
1838                "Can't find schema by id {}",
1839                expect_schema_id
1840            )));
1841        };
1842        let Some(partition_spec) = self
1843            .table
1844            .metadata()
1845            .partition_spec_by_id(expect_partition_spec_id)
1846        else {
1847            return Err(SinkError::Iceberg(anyhow!(
1848                "Can't find partition spec by id {}",
1849                expect_partition_spec_id
1850            )));
1851        };
1852        let partition_type = partition_spec
1853            .as_ref()
1854            .clone()
1855            .partition_type(schema)
1856            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1857
1858        let txn = Transaction::new(&self.table);
1859        // Only generate new snapshot id when first commit.
1860        let snapshot_id = match snapshot_id {
1861            Some(previous_snapshot_id) => previous_snapshot_id,
1862            None => txn.generate_unique_snapshot_id(),
1863        };
1864        if self.is_exactly_once && is_first_commit {
1865            // persist pre commit metadata and snapshot id in system table.
1866            let mut pre_commit_metadata_bytes = Vec::new();
1867            for each_parallelism_write_result in write_results.clone() {
1868                let each_parallelism_write_result_bytes: Vec<u8> =
1869                    each_parallelism_write_result.try_into()?;
1870                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1871            }
1872
1873            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1874
1875            persist_pre_commit_metadata(
1876                self.sink_id,
1877                self.db.clone(),
1878                self.last_commit_epoch,
1879                epoch,
1880                pre_commit_metadata_bytes,
1881                snapshot_id,
1882            )
1883            .await?;
1884        }
1885
1886        let data_files = write_results
1887            .into_iter()
1888            .flat_map(|r| {
1889                r.data_files.into_iter().map(|f| {
1890                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1891                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1892                })
1893            })
1894            .collect::<Result<Vec<DataFile>>>()?;
1895        // # TODO:
1896        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1897        // because retry logic involved reapply the commit metadata.
1898        // For now, we just retry the commit operation.
1899        let retry_strategy = ExponentialBackoff::from_millis(10)
1900            .max_delay(Duration::from_secs(60))
1901            .map(jitter)
1902            .take(self.commit_retry_num as usize);
1903        let catalog = self.catalog.clone();
1904        let table_ident = self.table.identifier().clone();
1905        let table = Retry::spawn(retry_strategy, || async {
1906            let table = Self::reload_table(
1907                catalog.as_ref(),
1908                &table_ident,
1909                expect_schema_id,
1910                expect_partition_spec_id,
1911            )
1912            .await?;
1913            let txn = Transaction::new(&table);
1914            let mut append_action = txn
1915                .fast_append(Some(snapshot_id), None, vec![])
1916                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1917                .with_to_branch(commit_branch(
1918                    self.config.r#type.as_str(),
1919                    self.config.write_mode.as_str(),
1920                ));
1921            append_action
1922                .add_data_files(data_files.clone())
1923                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1924            let tx = append_action.apply().await.map_err(|err| {
1925                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1926                SinkError::Iceberg(anyhow!(err))
1927            })?;
1928            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1929                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1930                SinkError::Iceberg(anyhow!(err))
1931            })
1932        })
1933        .await?;
1934        self.table = table;
1935
1936        let snapshot_num = self.table.metadata().snapshots().count();
1937        let catalog_name = self.config.common.catalog_name();
1938        let table_name = self.table.identifier().to_string();
1939        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1940        GLOBAL_SINK_METRICS
1941            .iceberg_snapshot_num
1942            .with_guarded_label_values(&metrics_labels)
1943            .set(snapshot_num as i64);
1944
1945        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1946
1947        if self.is_exactly_once {
1948            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1949            tracing::info!(
1950                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1951                self.sink_id,
1952                epoch
1953            );
1954
1955            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1956        }
1957        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1958            && self.config.enable_compaction
1959            && iceberg_compact_stat_sender
1960                .send(IcebergSinkCompactionUpdate {
1961                    sink_id: SinkId::new(self.sink_id),
1962                    compaction_interval: self.config.compaction_interval_sec(),
1963                })
1964                .is_err()
1965        {
1966            warn!("failed to send iceberg compaction stats");
1967        }
1968
1969        Ok(())
1970    }
1971
1972    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1973    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1974    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1975    async fn is_snapshot_id_in_iceberg(
1976        &self,
1977        iceberg_config: &IcebergConfig,
1978        snapshot_id: i64,
1979    ) -> Result<bool> {
1980        let iceberg_common = iceberg_config.common.clone();
1981        let table = iceberg_common
1982            .load_table(&iceberg_config.java_catalog_props)
1983            .await?;
1984        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1985            Ok(true)
1986        } else {
1987            Ok(false)
1988        }
1989    }
1990}
1991
1992const MAP_KEY: &str = "key";
1993const MAP_VALUE: &str = "value";
1994
1995fn get_fields<'a>(
1996    our_field_type: &'a risingwave_common::types::DataType,
1997    data_type: &ArrowDataType,
1998    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1999) -> Option<ArrowFields> {
2000    match data_type {
2001        ArrowDataType::Struct(fields) => {
2002            match our_field_type {
2003                risingwave_common::types::DataType::Struct(struct_fields) => {
2004                    struct_fields.iter().for_each(|(name, data_type)| {
2005                        let res = schema_fields.insert(name, data_type);
2006                        // This assert is to make sure there is no duplicate field name in the schema.
2007                        assert!(res.is_none())
2008                    });
2009                }
2010                risingwave_common::types::DataType::Map(map_fields) => {
2011                    schema_fields.insert(MAP_KEY, map_fields.key());
2012                    schema_fields.insert(MAP_VALUE, map_fields.value());
2013                }
2014                risingwave_common::types::DataType::List(list_field) => {
2015                    list_field.as_struct().iter().for_each(|(name, data_type)| {
2016                        let res = schema_fields.insert(name, data_type);
2017                        // This assert is to make sure there is no duplicate field name in the schema.
2018                        assert!(res.is_none())
2019                    });
2020                }
2021                _ => {}
2022            };
2023            Some(fields.clone())
2024        }
2025        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2026            get_fields(our_field_type, field.data_type(), schema_fields)
2027        }
2028        _ => None, // not a supported complex type and unlikely to show up
2029    }
2030}
2031
2032fn check_compatibility(
2033    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2034    fields: &ArrowFields,
2035) -> anyhow::Result<bool> {
2036    for arrow_field in fields {
2037        let our_field_type = schema_fields
2038            .get(arrow_field.name().as_str())
2039            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2040
2041        // Iceberg source should be able to read iceberg decimal type.
2042        let converted_arrow_data_type = IcebergArrowConvert
2043            .to_arrow_field("", our_field_type)
2044            .map_err(|e| anyhow!(e))?
2045            .data_type()
2046            .clone();
2047
2048        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2049            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2050            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2051            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2052            (ArrowDataType::List(_), ArrowDataType::List(field))
2053            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2054                let mut schema_fields = HashMap::new();
2055                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2056                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2057            }
2058            // validate nested structs
2059            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2060                let mut schema_fields = HashMap::new();
2061                our_field_type
2062                    .as_struct()
2063                    .iter()
2064                    .for_each(|(name, data_type)| {
2065                        let res = schema_fields.insert(name, data_type);
2066                        // This assert is to make sure there is no duplicate field name in the schema.
2067                        assert!(res.is_none())
2068                    });
2069                check_compatibility(schema_fields, fields)?
2070            }
2071            // cases where left != right (metadata, field name mismatch)
2072            //
2073            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2074            // {"PARQUET:field_id": ".."}
2075            //
2076            // map: The standard name in arrow is "entries", "key", "value".
2077            // in iceberg-rs, it's called "key_value"
2078            (left, right) => left.equals_datatype(right),
2079        };
2080        if !compatible {
2081            bail!(
2082                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2083                arrow_field.name(),
2084                converted_arrow_data_type,
2085                arrow_field.data_type()
2086            );
2087        }
2088    }
2089    Ok(true)
2090}
2091
2092/// Try to match our schema with iceberg schema.
2093pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2094    if rw_schema.fields.len() != arrow_schema.fields().len() {
2095        bail!(
2096            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2097            rw_schema.fields.len(),
2098            arrow_schema.fields.len()
2099        );
2100    }
2101
2102    let mut schema_fields = HashMap::new();
2103    rw_schema.fields.iter().for_each(|field| {
2104        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2105        // This assert is to make sure there is no duplicate field name in the schema.
2106        assert!(res.is_none())
2107    });
2108
2109    check_compatibility(schema_fields, &arrow_schema.fields)?;
2110    Ok(())
2111}
2112
2113pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2114    serde_json::to_vec(&metadata).unwrap()
2115}
2116
2117pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2118    serde_json::from_slice(&bytes).unwrap()
2119}
2120
2121pub fn parse_partition_by_exprs(
2122    expr: String,
2123) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2124    // captures column, transform(column), transform(n,column), transform(n, column)
2125    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2126    if !re.is_match(&expr) {
2127        bail!(format!(
2128            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2129            expr
2130        ))
2131    }
2132    let caps = re.captures_iter(&expr);
2133
2134    let mut partition_columns = vec![];
2135
2136    for mat in caps {
2137        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2138            (&mat["transform"], Transform::Identity)
2139        } else {
2140            let mut func = mat["transform"].to_owned();
2141            if func == "bucket" || func == "truncate" {
2142                let n = &mat
2143                    .name("n")
2144                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2145                    .as_str();
2146                func = format!("{func}[{n}]");
2147            }
2148            (
2149                &mat["field"],
2150                Transform::from_str(&func)
2151                    .with_context(|| format!("invalid transform function {}", func))?,
2152            )
2153        };
2154        partition_columns.push((column.to_owned(), transform));
2155    }
2156    Ok(partition_columns)
2157}
2158
2159pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2160    if should_enable_iceberg_cow(sink_type, write_mode) {
2161        ICEBERG_COW_BRANCH.to_owned()
2162    } else {
2163        MAIN_BRANCH.to_owned()
2164    }
2165}
2166
2167pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2168    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2169}
2170
2171#[cfg(test)]
2172mod test {
2173    use std::collections::BTreeMap;
2174
2175    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2176    use risingwave_common::types::{DataType, MapType, StructType};
2177
2178    use crate::connector_common::IcebergCommon;
2179    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2180    use crate::sink::iceberg::{
2181        COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2182        ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2183        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2184        SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2185    };
2186
2187    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2188
2189    #[test]
2190    fn test_compatible_arrow_schema() {
2191        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2192
2193        use super::*;
2194        let risingwave_schema = Schema::new(vec![
2195            Field::with_name(DataType::Int32, "a"),
2196            Field::with_name(DataType::Int32, "b"),
2197            Field::with_name(DataType::Int32, "c"),
2198        ]);
2199        let arrow_schema = ArrowSchema::new(vec![
2200            ArrowField::new("a", ArrowDataType::Int32, false),
2201            ArrowField::new("b", ArrowDataType::Int32, false),
2202            ArrowField::new("c", ArrowDataType::Int32, false),
2203        ]);
2204
2205        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2206
2207        let risingwave_schema = Schema::new(vec![
2208            Field::with_name(DataType::Int32, "d"),
2209            Field::with_name(DataType::Int32, "c"),
2210            Field::with_name(DataType::Int32, "a"),
2211            Field::with_name(DataType::Int32, "b"),
2212        ]);
2213        let arrow_schema = ArrowSchema::new(vec![
2214            ArrowField::new("a", ArrowDataType::Int32, false),
2215            ArrowField::new("b", ArrowDataType::Int32, false),
2216            ArrowField::new("d", ArrowDataType::Int32, false),
2217            ArrowField::new("c", ArrowDataType::Int32, false),
2218        ]);
2219        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2220
2221        let risingwave_schema = Schema::new(vec![
2222            Field::with_name(
2223                DataType::Struct(StructType::new(vec![
2224                    ("a1", DataType::Int32),
2225                    (
2226                        "a2",
2227                        DataType::Struct(StructType::new(vec![
2228                            ("a21", DataType::Bytea),
2229                            (
2230                                "a22",
2231                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2232                            ),
2233                        ])),
2234                    ),
2235                ])),
2236                "a",
2237            ),
2238            Field::with_name(
2239                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2240                    ("b1", DataType::Int32),
2241                    ("b2", DataType::Bytea),
2242                    (
2243                        "b3",
2244                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2245                    ),
2246                ])))),
2247                "b",
2248            ),
2249            Field::with_name(
2250                DataType::Map(MapType::from_kv(
2251                    DataType::Varchar,
2252                    DataType::List(Box::new(DataType::Struct(StructType::new([
2253                        ("c1", DataType::Int32),
2254                        ("c2", DataType::Bytea),
2255                        (
2256                            "c3",
2257                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2258                        ),
2259                    ])))),
2260                )),
2261                "c",
2262            ),
2263        ]);
2264        let arrow_schema = ArrowSchema::new(vec![
2265            ArrowField::new(
2266                "a",
2267                ArrowDataType::Struct(ArrowFields::from(vec![
2268                    ArrowField::new("a1", ArrowDataType::Int32, false),
2269                    ArrowField::new(
2270                        "a2",
2271                        ArrowDataType::Struct(ArrowFields::from(vec![
2272                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2273                            ArrowField::new_map(
2274                                "a22",
2275                                "entries",
2276                                ArrowFieldRef::new(ArrowField::new(
2277                                    "key",
2278                                    ArrowDataType::Utf8,
2279                                    false,
2280                                )),
2281                                ArrowFieldRef::new(ArrowField::new(
2282                                    "value",
2283                                    ArrowDataType::Utf8,
2284                                    false,
2285                                )),
2286                                false,
2287                                false,
2288                            ),
2289                        ])),
2290                        false,
2291                    ),
2292                ])),
2293                false,
2294            ),
2295            ArrowField::new(
2296                "b",
2297                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2298                    ArrowDataType::Struct(ArrowFields::from(vec![
2299                        ArrowField::new("b1", ArrowDataType::Int32, false),
2300                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2301                        ArrowField::new_map(
2302                            "b3",
2303                            "entries",
2304                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2305                            ArrowFieldRef::new(ArrowField::new(
2306                                "value",
2307                                ArrowDataType::Utf8,
2308                                false,
2309                            )),
2310                            false,
2311                            false,
2312                        ),
2313                    ])),
2314                    false,
2315                ))),
2316                false,
2317            ),
2318            ArrowField::new_map(
2319                "c",
2320                "entries",
2321                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2322                ArrowFieldRef::new(ArrowField::new(
2323                    "value",
2324                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2325                        ArrowDataType::Struct(ArrowFields::from(vec![
2326                            ArrowField::new("c1", ArrowDataType::Int32, false),
2327                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2328                            ArrowField::new_map(
2329                                "c3",
2330                                "entries",
2331                                ArrowFieldRef::new(ArrowField::new(
2332                                    "key",
2333                                    ArrowDataType::Utf8,
2334                                    false,
2335                                )),
2336                                ArrowFieldRef::new(ArrowField::new(
2337                                    "value",
2338                                    ArrowDataType::Utf8,
2339                                    false,
2340                                )),
2341                                false,
2342                                false,
2343                            ),
2344                        ])),
2345                        false,
2346                    ))),
2347                    false,
2348                )),
2349                false,
2350                false,
2351            ),
2352        ]);
2353        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2354    }
2355
2356    #[test]
2357    fn test_parse_iceberg_config() {
2358        let values = [
2359            ("connector", "iceberg"),
2360            ("type", "upsert"),
2361            ("primary_key", "v1"),
2362            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2363            ("warehouse.path", "s3://iceberg"),
2364            ("s3.endpoint", "http://127.0.0.1:9301"),
2365            ("s3.access.key", "hummockadmin"),
2366            ("s3.secret.key", "hummockadmin"),
2367            ("s3.path.style.access", "true"),
2368            ("s3.region", "us-east-1"),
2369            ("catalog.type", "jdbc"),
2370            ("catalog.name", "demo"),
2371            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2372            ("catalog.jdbc.user", "admin"),
2373            ("catalog.jdbc.password", "123456"),
2374            ("database.name", "demo_db"),
2375            ("table.name", "demo_table"),
2376            ("enable_compaction", "true"),
2377            ("compaction_interval_sec", "1800"),
2378            ("enable_snapshot_expiration", "true"),
2379        ]
2380        .into_iter()
2381        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2382        .collect();
2383
2384        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2385
2386        let expected_iceberg_config = IcebergConfig {
2387            common: IcebergCommon {
2388                warehouse_path: Some("s3://iceberg".to_owned()),
2389                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2390                region: Some("us-east-1".to_owned()),
2391                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2392                access_key: Some("hummockadmin".to_owned()),
2393                secret_key: Some("hummockadmin".to_owned()),
2394                gcs_credential: None,
2395                catalog_type: Some("jdbc".to_owned()),
2396                glue_id: None,
2397                catalog_name: Some("demo".to_owned()),
2398                database_name: Some("demo_db".to_owned()),
2399                table_name: "demo_table".to_owned(),
2400                path_style_access: Some(true),
2401                credential: None,
2402                oauth2_server_uri: None,
2403                scope: None,
2404                token: None,
2405                enable_config_load: None,
2406                rest_signing_name: None,
2407                rest_signing_region: None,
2408                rest_sigv4_enabled: None,
2409                hosted_catalog: None,
2410                azblob_account_name: None,
2411                azblob_account_key: None,
2412                azblob_endpoint_url: None,
2413                header: None,
2414            },
2415            r#type: "upsert".to_owned(),
2416            force_append_only: false,
2417            primary_key: Some(vec!["v1".to_owned()]),
2418            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2419            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2420                .into_iter()
2421                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2422                .collect(),
2423            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2424            create_table_if_not_exists: false,
2425            is_exactly_once: None,
2426            commit_retry_num: 8,
2427            enable_compaction: true,
2428            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2429            enable_snapshot_expiration: true,
2430            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2431            snapshot_expiration_max_age_millis: None,
2432            snapshot_expiration_retain_last: None,
2433            snapshot_expiration_clear_expired_files: true,
2434            snapshot_expiration_clear_expired_meta_data: true
2435        };
2436
2437        assert_eq!(iceberg_config, expected_iceberg_config);
2438
2439        assert_eq!(
2440            &iceberg_config.common.full_table_name().unwrap().to_string(),
2441            "demo_db.demo_table"
2442        );
2443    }
2444
2445    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2446        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2447
2448        let table = iceberg_config.load_table().await.unwrap();
2449
2450        println!("{:?}", table.identifier());
2451    }
2452
2453    #[tokio::test]
2454    #[ignore]
2455    async fn test_storage_catalog() {
2456        let values = [
2457            ("connector", "iceberg"),
2458            ("type", "append-only"),
2459            ("force_append_only", "true"),
2460            ("s3.endpoint", "http://127.0.0.1:9301"),
2461            ("s3.access.key", "hummockadmin"),
2462            ("s3.secret.key", "hummockadmin"),
2463            ("s3.region", "us-east-1"),
2464            ("s3.path.style.access", "true"),
2465            ("catalog.name", "demo"),
2466            ("catalog.type", "storage"),
2467            ("warehouse.path", "s3://icebergdata/demo"),
2468            ("database.name", "s1"),
2469            ("table.name", "t1"),
2470        ]
2471        .into_iter()
2472        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2473        .collect();
2474
2475        test_create_catalog(values).await;
2476    }
2477
2478    #[tokio::test]
2479    #[ignore]
2480    async fn test_rest_catalog() {
2481        let values = [
2482            ("connector", "iceberg"),
2483            ("type", "append-only"),
2484            ("force_append_only", "true"),
2485            ("s3.endpoint", "http://127.0.0.1:9301"),
2486            ("s3.access.key", "hummockadmin"),
2487            ("s3.secret.key", "hummockadmin"),
2488            ("s3.region", "us-east-1"),
2489            ("s3.path.style.access", "true"),
2490            ("catalog.name", "demo"),
2491            ("catalog.type", "rest"),
2492            ("catalog.uri", "http://192.168.167.4:8181"),
2493            ("warehouse.path", "s3://icebergdata/demo"),
2494            ("database.name", "s1"),
2495            ("table.name", "t1"),
2496        ]
2497        .into_iter()
2498        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2499        .collect();
2500
2501        test_create_catalog(values).await;
2502    }
2503
2504    #[tokio::test]
2505    #[ignore]
2506    async fn test_jdbc_catalog() {
2507        let values = [
2508            ("connector", "iceberg"),
2509            ("type", "append-only"),
2510            ("force_append_only", "true"),
2511            ("s3.endpoint", "http://127.0.0.1:9301"),
2512            ("s3.access.key", "hummockadmin"),
2513            ("s3.secret.key", "hummockadmin"),
2514            ("s3.region", "us-east-1"),
2515            ("s3.path.style.access", "true"),
2516            ("catalog.name", "demo"),
2517            ("catalog.type", "jdbc"),
2518            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2519            ("catalog.jdbc.user", "admin"),
2520            ("catalog.jdbc.password", "123456"),
2521            ("warehouse.path", "s3://icebergdata/demo"),
2522            ("database.name", "s1"),
2523            ("table.name", "t1"),
2524        ]
2525        .into_iter()
2526        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2527        .collect();
2528
2529        test_create_catalog(values).await;
2530    }
2531
2532    #[tokio::test]
2533    #[ignore]
2534    async fn test_hive_catalog() {
2535        let values = [
2536            ("connector", "iceberg"),
2537            ("type", "append-only"),
2538            ("force_append_only", "true"),
2539            ("s3.endpoint", "http://127.0.0.1:9301"),
2540            ("s3.access.key", "hummockadmin"),
2541            ("s3.secret.key", "hummockadmin"),
2542            ("s3.region", "us-east-1"),
2543            ("s3.path.style.access", "true"),
2544            ("catalog.name", "demo"),
2545            ("catalog.type", "hive"),
2546            ("catalog.uri", "thrift://localhost:9083"),
2547            ("warehouse.path", "s3://icebergdata/demo"),
2548            ("database.name", "s1"),
2549            ("table.name", "t1"),
2550        ]
2551        .into_iter()
2552        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2553        .collect();
2554
2555        test_create_catalog(values).await;
2556    }
2557
2558    #[test]
2559    fn test_config_constants_consistency() {
2560        // This test ensures our constants match the expected configuration names
2561        // If you change a constant, this test will remind you to update both places
2562        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2563        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2564        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2565        assert_eq!(WRITE_MODE, "write_mode");
2566        assert_eq!(
2567            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2568            "snapshot_expiration_retain_last"
2569        );
2570        assert_eq!(
2571            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2572            "snapshot_expiration_max_age_millis"
2573        );
2574        assert_eq!(
2575            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2576            "snapshot_expiration_clear_expired_files"
2577        );
2578        assert_eq!(
2579            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2580            "snapshot_expiration_clear_expired_meta_data"
2581        );
2582    }
2583}