risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31    SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40    POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45    DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86    SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95    TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100pub const ICEBERG_COW_BRANCH: &str = "ingestion";
101pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
102pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
103pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
104pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
105pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
108#[serde(rename_all = "kebab-case")]
109pub enum IcebergWriteMode {
110    #[default]
111    MergeOnRead,
112    CopyOnWrite,
113}
114
115impl IcebergWriteMode {
116    pub fn as_str(self) -> &'static str {
117        match self {
118            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
119            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
120        }
121    }
122}
123
124impl std::str::FromStr for IcebergWriteMode {
125    type Err = SinkError;
126
127    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128        match s {
129            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
130            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
131            _ => Err(SinkError::Config(anyhow!(format!(
132                "invalid write_mode: {}, must be one of: {}, {}",
133                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
134            )))),
135        }
136    }
137}
138
139impl TryFrom<&str> for IcebergWriteMode {
140    type Error = <Self as std::str::FromStr>::Err;
141
142    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
143        value.parse()
144    }
145}
146
147impl TryFrom<String> for IcebergWriteMode {
148    type Error = <Self as std::str::FromStr>::Err;
149
150    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
151        value.as_str().parse()
152    }
153}
154
155impl std::fmt::Display for IcebergWriteMode {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.write_str(self.as_str())
158    }
159}
160
161// Configuration constants
162pub const ENABLE_COMPACTION: &str = "enable_compaction";
163pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
164pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
165pub const WRITE_MODE: &str = "write_mode";
166pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
167pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
168pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
169pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
170    "snapshot_expiration_clear_expired_meta_data";
171pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
172
173pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
174
175pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
176
177pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
178
179pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
180
181pub const COMPACTION_TYPE: &str = "compaction.type";
182
183fn default_commit_retry_num() -> u32 {
184    8
185}
186
187fn default_iceberg_write_mode() -> IcebergWriteMode {
188    IcebergWriteMode::MergeOnRead
189}
190
191fn default_true() -> bool {
192    true
193}
194
195fn default_some_true() -> Option<bool> {
196    Some(true)
197}
198
199/// Compaction type for Iceberg sink
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
201#[serde(rename_all = "kebab-case")]
202pub enum CompactionType {
203    /// Full compaction - rewrites all data files
204    #[default]
205    Full,
206    /// Small files compaction - only compact small files
207    SmallFiles,
208    /// Files with delete compaction - only compact files that have associated delete files
209    FilesWithDelete,
210}
211
212impl CompactionType {
213    pub fn as_str(&self) -> &'static str {
214        match self {
215            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
216            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
217            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
218        }
219    }
220}
221
222impl std::str::FromStr for CompactionType {
223    type Err = SinkError;
224
225    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
226        match s {
227            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
228            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
229            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
230            _ => Err(SinkError::Config(anyhow!(format!(
231                "invalid compaction_type: {}, must be one of: {}, {}, {}",
232                s,
233                ICEBERG_COMPACTION_TYPE_FULL,
234                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
235                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
236            )))),
237        }
238    }
239}
240
241impl TryFrom<&str> for CompactionType {
242    type Error = <Self as std::str::FromStr>::Err;
243
244    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
245        value.parse()
246    }
247}
248
249impl TryFrom<String> for CompactionType {
250    type Error = <Self as std::str::FromStr>::Err;
251
252    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
253        value.as_str().parse()
254    }
255}
256
257impl std::fmt::Display for CompactionType {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        write!(f, "{}", self.as_str())
260    }
261}
262
263#[serde_as]
264#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
265pub struct IcebergConfig {
266    pub r#type: String, // accept "append-only" or "upsert"
267
268    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
269    pub force_append_only: bool,
270
271    #[serde(flatten)]
272    common: IcebergCommon,
273
274    #[serde(flatten)]
275    table: IcebergTableIdentifier,
276
277    #[serde(
278        rename = "primary_key",
279        default,
280        deserialize_with = "deserialize_optional_string_seq_from_string"
281    )]
282    pub primary_key: Option<Vec<String>>,
283
284    // Props for java catalog props.
285    #[serde(skip)]
286    pub java_catalog_props: HashMap<String, String>,
287
288    #[serde(default)]
289    pub partition_by: Option<String>,
290
291    /// Commit every n(>0) checkpoints, default is 60.
292    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
293    #[serde_as(as = "DisplayFromStr")]
294    #[with_option(allow_alter_on_fly)]
295    pub commit_checkpoint_interval: u64,
296
297    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
298    pub create_table_if_not_exists: bool,
299
300    /// Whether it is `exactly_once`, the default is true.
301    #[serde(default = "default_some_true")]
302    #[serde_as(as = "Option<DisplayFromStr>")]
303    pub is_exactly_once: Option<bool>,
304    // Retry commit num when iceberg commit fail. default is 8.
305    // # TODO
306    // Iceberg table may store the retry commit num in table meta.
307    // We should try to find and use that as default commit retry num first.
308    #[serde(default = "default_commit_retry_num")]
309    pub commit_retry_num: u32,
310
311    /// Whether to enable iceberg compaction.
312    #[serde(
313        rename = "enable_compaction",
314        default,
315        deserialize_with = "deserialize_bool_from_string"
316    )]
317    #[with_option(allow_alter_on_fly)]
318    pub enable_compaction: bool,
319
320    /// The interval of iceberg compaction
321    #[serde(rename = "compaction_interval_sec", default)]
322    #[serde_as(as = "Option<DisplayFromStr>")]
323    #[with_option(allow_alter_on_fly)]
324    pub compaction_interval_sec: Option<u64>,
325
326    /// Whether to enable iceberg expired snapshots.
327    #[serde(
328        rename = "enable_snapshot_expiration",
329        default,
330        deserialize_with = "deserialize_bool_from_string"
331    )]
332    #[with_option(allow_alter_on_fly)]
333    pub enable_snapshot_expiration: bool,
334
335    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
336    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
337    pub write_mode: IcebergWriteMode,
338
339    /// The maximum age (in milliseconds) for snapshots before they expire
340    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
341    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
342    #[serde_as(as = "Option<DisplayFromStr>")]
343    #[with_option(allow_alter_on_fly)]
344    pub snapshot_expiration_max_age_millis: Option<i64>,
345
346    /// The number of snapshots to retain
347    #[serde(rename = "snapshot_expiration_retain_last", default)]
348    #[serde_as(as = "Option<DisplayFromStr>")]
349    #[with_option(allow_alter_on_fly)]
350    pub snapshot_expiration_retain_last: Option<i32>,
351
352    #[serde(
353        rename = "snapshot_expiration_clear_expired_files",
354        default = "default_true",
355        deserialize_with = "deserialize_bool_from_string"
356    )]
357    #[with_option(allow_alter_on_fly)]
358    pub snapshot_expiration_clear_expired_files: bool,
359
360    #[serde(
361        rename = "snapshot_expiration_clear_expired_meta_data",
362        default = "default_true",
363        deserialize_with = "deserialize_bool_from_string"
364    )]
365    #[with_option(allow_alter_on_fly)]
366    pub snapshot_expiration_clear_expired_meta_data: bool,
367
368    /// The maximum number of snapshots allowed since the last rewrite operation
369    /// If set, sink will check snapshot count and wait if exceeded
370    #[serde(rename = "compaction.max_snapshots_num", default)]
371    #[serde_as(as = "Option<DisplayFromStr>")]
372    #[with_option(allow_alter_on_fly)]
373    pub max_snapshots_num_before_compaction: Option<usize>,
374
375    #[serde(rename = "compaction.small_files_threshold_mb", default)]
376    #[serde_as(as = "Option<DisplayFromStr>")]
377    #[with_option(allow_alter_on_fly)]
378    pub small_files_threshold_mb: Option<u64>,
379
380    #[serde(rename = "compaction.delete_files_count_threshold", default)]
381    #[serde_as(as = "Option<DisplayFromStr>")]
382    #[with_option(allow_alter_on_fly)]
383    pub delete_files_count_threshold: Option<usize>,
384
385    #[serde(rename = "compaction.trigger_snapshot_count", default)]
386    #[serde_as(as = "Option<DisplayFromStr>")]
387    #[with_option(allow_alter_on_fly)]
388    pub trigger_snapshot_count: Option<usize>,
389
390    #[serde(rename = "compaction.target_file_size_mb", default)]
391    #[serde_as(as = "Option<DisplayFromStr>")]
392    #[with_option(allow_alter_on_fly)]
393    pub target_file_size_mb: Option<u64>,
394
395    /// Compaction type: `full`, `small-files`, or `files-with-delete`
396    /// If not set, will default to `full`
397    #[serde(rename = "compaction.type", default)]
398    #[with_option(allow_alter_on_fly)]
399    pub compaction_type: Option<CompactionType>,
400}
401
402impl EnforceSecret for IcebergConfig {
403    fn enforce_secret<'a>(
404        prop_iter: impl Iterator<Item = &'a str>,
405    ) -> crate::error::ConnectorResult<()> {
406        for prop in prop_iter {
407            IcebergCommon::enforce_one(prop)?;
408        }
409        Ok(())
410    }
411
412    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
413        IcebergCommon::enforce_one(prop)
414    }
415}
416
417impl IcebergConfig {
418    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
419        let mut config =
420            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
421                .map_err(|e| SinkError::Config(anyhow!(e)))?;
422
423        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
424            return Err(SinkError::Config(anyhow!(
425                "`{}` must be {}, or {}",
426                SINK_TYPE_OPTION,
427                SINK_TYPE_APPEND_ONLY,
428                SINK_TYPE_UPSERT
429            )));
430        }
431
432        if config.r#type == SINK_TYPE_UPSERT {
433            if let Some(primary_key) = &config.primary_key {
434                if primary_key.is_empty() {
435                    return Err(SinkError::Config(anyhow!(
436                        "`primary-key` must not be empty in {}",
437                        SINK_TYPE_UPSERT
438                    )));
439                }
440            } else {
441                return Err(SinkError::Config(anyhow!(
442                    "Must set `primary-key` in {}",
443                    SINK_TYPE_UPSERT
444                )));
445            }
446        }
447
448        // All configs start with "catalog." will be treated as java configs.
449        config.java_catalog_props = values
450            .iter()
451            .filter(|(k, _v)| {
452                k.starts_with("catalog.")
453                    && k != &"catalog.uri"
454                    && k != &"catalog.type"
455                    && k != &"catalog.name"
456                    && k != &"catalog.header"
457            })
458            .map(|(k, v)| (k[8..].to_string(), v.clone()))
459            .collect();
460
461        if config.commit_checkpoint_interval == 0 {
462            return Err(SinkError::Config(anyhow!(
463                "`commit-checkpoint-interval` must be greater than 0"
464            )));
465        }
466
467        Ok(config)
468    }
469
470    pub fn catalog_type(&self) -> &str {
471        self.common.catalog_type()
472    }
473
474    pub async fn load_table(&self) -> Result<Table> {
475        self.common
476            .load_table(&self.table, &self.java_catalog_props)
477            .await
478            .map_err(Into::into)
479    }
480
481    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
482        self.common
483            .create_catalog(&self.java_catalog_props)
484            .await
485            .map_err(Into::into)
486    }
487
488    pub fn full_table_name(&self) -> Result<TableIdent> {
489        self.table.to_table_ident().map_err(Into::into)
490    }
491
492    pub fn catalog_name(&self) -> String {
493        self.common.catalog_name()
494    }
495
496    pub fn compaction_interval_sec(&self) -> u64 {
497        // default to 1 hour
498        self.compaction_interval_sec.unwrap_or(3600)
499    }
500
501    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
502    /// Returns `current_time_ms` - `max_age_millis`
503    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
504        self.snapshot_expiration_max_age_millis
505            .map(|max_age_millis| current_time_ms - max_age_millis)
506    }
507
508    pub fn trigger_snapshot_count(&self) -> usize {
509        self.trigger_snapshot_count.unwrap_or(16)
510    }
511
512    pub fn small_files_threshold_mb(&self) -> u64 {
513        self.small_files_threshold_mb.unwrap_or(64)
514    }
515
516    pub fn delete_files_count_threshold(&self) -> usize {
517        self.delete_files_count_threshold.unwrap_or(256)
518    }
519
520    pub fn target_file_size_mb(&self) -> u64 {
521        self.target_file_size_mb.unwrap_or(1024)
522    }
523
524    /// Get the compaction type as an enum
525    /// This method parses the string and returns the enum value
526    pub fn compaction_type(&self) -> CompactionType {
527        self.compaction_type.unwrap_or_default()
528    }
529}
530
531pub struct IcebergSink {
532    pub config: IcebergConfig,
533    param: SinkParam,
534    // In upsert mode, it never be None and empty.
535    unique_column_ids: Option<Vec<usize>>,
536}
537
538impl EnforceSecret for IcebergSink {
539    fn enforce_secret<'a>(
540        prop_iter: impl Iterator<Item = &'a str>,
541    ) -> crate::error::ConnectorResult<()> {
542        for prop in prop_iter {
543            IcebergConfig::enforce_one(prop)?;
544        }
545        Ok(())
546    }
547}
548
549impl TryFrom<SinkParam> for IcebergSink {
550    type Error = SinkError;
551
552    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
553        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
554        IcebergSink::new(config, param)
555    }
556}
557
558impl Debug for IcebergSink {
559    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560        f.debug_struct("IcebergSink")
561            .field("config", &self.config)
562            .finish()
563    }
564}
565
566async fn create_and_validate_table_impl(
567    config: &IcebergConfig,
568    param: &SinkParam,
569) -> Result<Table> {
570    if config.create_table_if_not_exists {
571        create_table_if_not_exists_impl(config, param).await?;
572    }
573
574    let table = config
575        .load_table()
576        .await
577        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
578
579    let sink_schema = param.schema();
580    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
581        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
582
583    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
584        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
585
586    Ok(table)
587}
588
589async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
590    let catalog = config.create_catalog().await?;
591    let namespace = if let Some(database_name) = config.table.database_name() {
592        let namespace = NamespaceIdent::new(database_name.to_owned());
593        if !catalog
594            .namespace_exists(&namespace)
595            .await
596            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
597        {
598            catalog
599                .create_namespace(&namespace, HashMap::default())
600                .await
601                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
602                .context("failed to create iceberg namespace")?;
603        }
604        namespace
605    } else {
606        bail!("database name must be set if you want to create table")
607    };
608
609    let table_id = config
610        .full_table_name()
611        .context("Unable to parse table name")?;
612    if !catalog
613        .table_exists(&table_id)
614        .await
615        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
616    {
617        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
618        // convert risingwave schema -> arrow schema -> iceberg schema
619        let arrow_fields = param
620            .columns
621            .iter()
622            .map(|column| {
623                Ok(iceberg_create_table_arrow_convert
624                    .to_arrow_field(&column.name, &column.data_type)
625                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
626                    .context(format!(
627                        "failed to convert {}: {} to arrow type",
628                        &column.name, &column.data_type
629                    ))?)
630            })
631            .collect::<Result<Vec<ArrowField>>>()?;
632        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
633        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
634            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
635            .context("failed to convert arrow schema to iceberg schema")?;
636
637        let location = {
638            let mut names = namespace.clone().inner();
639            names.push(config.table.table_name().to_owned());
640            match &config.common.warehouse_path {
641                Some(warehouse_path) => {
642                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
643                    let url = Url::parse(warehouse_path);
644                    if url.is_err() || is_s3_tables {
645                        // For rest catalog, the warehouse_path could be a warehouse name.
646                        // In this case, we should specify the location when creating a table.
647                        if config.common.catalog_type() == "rest"
648                            || config.common.catalog_type() == "rest_rust"
649                        {
650                            None
651                        } else {
652                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
653                        }
654                    } else if warehouse_path.ends_with('/') {
655                        Some(format!("{}{}", warehouse_path, names.join("/")))
656                    } else {
657                        Some(format!("{}/{}", warehouse_path, names.join("/")))
658                    }
659                }
660                None => None,
661            }
662        };
663
664        let partition_spec = match &config.partition_by {
665            Some(partition_by) => {
666                let mut partition_fields = Vec::<UnboundPartitionField>::new();
667                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
668                    .into_iter()
669                    .enumerate()
670                {
671                    match iceberg_schema.field_id_by_name(&column) {
672                        Some(id) => partition_fields.push(
673                            UnboundPartitionField::builder()
674                                .source_id(id)
675                                .transform(transform)
676                                .name(format!("_p_{}", column))
677                                .field_id(i as i32)
678                                .build(),
679                        ),
680                        None => bail!(format!(
681                            "Partition source column does not exist in schema: {}",
682                            column
683                        )),
684                    };
685                }
686                Some(
687                    UnboundPartitionSpec::builder()
688                        .with_spec_id(0)
689                        .add_partition_fields(partition_fields)
690                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
691                        .context("failed to add partition columns")?
692                        .build(),
693                )
694            }
695            None => None,
696        };
697
698        let table_creation_builder = TableCreation::builder()
699            .name(config.table.table_name().to_owned())
700            .schema(iceberg_schema);
701
702        let table_creation = match (location, partition_spec) {
703            (Some(location), Some(partition_spec)) => table_creation_builder
704                .location(location)
705                .partition_spec(partition_spec)
706                .build(),
707            (Some(location), None) => table_creation_builder.location(location).build(),
708            (None, Some(partition_spec)) => table_creation_builder
709                .partition_spec(partition_spec)
710                .build(),
711            (None, None) => table_creation_builder.build(),
712        };
713
714        catalog
715            .create_table(&namespace, table_creation)
716            .await
717            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
718            .context("failed to create iceberg table")?;
719    }
720    Ok(())
721}
722
723impl IcebergSink {
724    pub async fn create_and_validate_table(&self) -> Result<Table> {
725        create_and_validate_table_impl(&self.config, &self.param).await
726    }
727
728    pub async fn create_table_if_not_exists(&self) -> Result<()> {
729        create_table_if_not_exists_impl(&self.config, &self.param).await
730    }
731
732    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
733        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
734            if let Some(pk) = &config.primary_key {
735                let mut unique_column_ids = Vec::with_capacity(pk.len());
736                for col_name in pk {
737                    let id = param
738                        .columns
739                        .iter()
740                        .find(|col| col.name.as_str() == col_name)
741                        .ok_or_else(|| {
742                            SinkError::Config(anyhow!(
743                                "Primary key column {} not found in sink schema",
744                                col_name
745                            ))
746                        })?
747                        .column_id
748                        .get_id() as usize;
749                    unique_column_ids.push(id);
750                }
751                Some(unique_column_ids)
752            } else {
753                unreachable!()
754            }
755        } else {
756            None
757        };
758        Ok(Self {
759            config,
760            param,
761            unique_column_ids,
762        })
763    }
764}
765
766impl Sink for IcebergSink {
767    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
768
769    const SINK_NAME: &'static str = ICEBERG_SINK;
770
771    async fn validate(&self) -> Result<()> {
772        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
773            bail!("Snowflake catalog only supports iceberg sources");
774        }
775
776        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
777            risingwave_common::license::Feature::IcebergSinkWithGlue
778                .check_available()
779                .map_err(|e| anyhow::anyhow!(e))?;
780        }
781
782        // Validate compaction type configuration
783        let compaction_type = self.config.compaction_type();
784
785        // Check COW mode constraints
786        // COW mode only supports 'full' compaction type
787        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
788            && compaction_type != CompactionType::Full
789        {
790            bail!(
791                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
792                compaction_type
793            );
794        }
795
796        match compaction_type {
797            CompactionType::SmallFiles => {
798                // 1. check license
799                risingwave_common::license::Feature::IcebergCompaction
800                    .check_available()
801                    .map_err(|e| anyhow::anyhow!(e))?;
802
803                // 2. check write mode
804                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
805                    bail!(
806                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
807                        self.config.write_mode
808                    );
809                }
810
811                // 3. check conflicting parameters
812                if self.config.delete_files_count_threshold.is_some() {
813                    bail!(
814                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
815                    );
816                }
817            }
818            CompactionType::FilesWithDelete => {
819                // 1. check license
820                risingwave_common::license::Feature::IcebergCompaction
821                    .check_available()
822                    .map_err(|e| anyhow::anyhow!(e))?;
823
824                // 2. check write mode
825                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
826                    bail!(
827                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
828                        self.config.write_mode
829                    );
830                }
831
832                // 3. check conflicting parameters
833                if self.config.small_files_threshold_mb.is_some() {
834                    bail!(
835                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
836                    );
837                }
838            }
839            CompactionType::Full => {
840                // Full compaction has no special requirements
841            }
842        }
843
844        let _ = self.create_and_validate_table().await?;
845        Ok(())
846    }
847
848    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
849        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
850
851        // Validate compaction interval
852        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
853            if iceberg_config.enable_compaction && compaction_interval == 0 {
854                bail!(
855                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
856                );
857            }
858
859            // log compaction_interval
860            tracing::info!(
861                "Alter config compaction_interval set to {} seconds",
862                compaction_interval
863            );
864        }
865
866        // Validate max snapshots
867        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
868            && max_snapshots < 1
869        {
870            bail!(
871                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
872                max_snapshots
873            );
874        }
875
876        Ok(())
877    }
878
879    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
880        let writer = IcebergSinkWriter::new(
881            self.config.clone(),
882            self.param.clone(),
883            writer_param.clone(),
884            self.unique_column_ids.clone(),
885        );
886
887        let commit_checkpoint_interval =
888            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
889                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
890            );
891        let log_sinker = CoordinatedLogSinker::new(
892            &writer_param,
893            self.param.clone(),
894            writer,
895            commit_checkpoint_interval,
896        )
897        .await?;
898
899        Ok(log_sinker)
900    }
901
902    fn is_coordinated_sink(&self) -> bool {
903        true
904    }
905
906    async fn new_coordinator(
907        &self,
908        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
909    ) -> Result<SinkCommitCoordinator> {
910        let catalog = self.config.create_catalog().await?;
911        let table = self.create_and_validate_table().await?;
912        let coordinator = IcebergSinkCommitter {
913            catalog,
914            table,
915            last_commit_epoch: 0,
916            sink_id: self.param.sink_id,
917            config: self.config.clone(),
918            param: self.param.clone(),
919            commit_retry_num: self.config.commit_retry_num,
920            iceberg_compact_stat_sender,
921        };
922        if self.config.is_exactly_once.unwrap_or_default() {
923            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
924        } else {
925            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
926        }
927    }
928}
929
930/// None means no project.
931/// Prepare represent the extra partition column idx.
932/// Done represents the project idx vec.
933///
934/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
935/// to create the project idx vec.
936enum ProjectIdxVec {
937    None,
938    Prepare(usize),
939    Done(Vec<usize>),
940}
941
942type DataFileWriterBuilderType =
943    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
944type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
945    ParquetWriterBuilder,
946    DefaultLocationGenerator,
947    DefaultFileNameGenerator,
948>;
949type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
950    ParquetWriterBuilder,
951    DefaultLocationGenerator,
952    DefaultFileNameGenerator,
953>;
954
955#[derive(Clone)]
956struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
957    inner: B,
958    fanout_enabled: bool,
959    schema: IcebergSchemaRef,
960    partition_spec: PartitionSpecRef,
961    compute_partition: bool,
962}
963
964impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
965    fn new(
966        inner: B,
967        fanout_enabled: bool,
968        schema: IcebergSchemaRef,
969        partition_spec: PartitionSpecRef,
970        compute_partition: bool,
971    ) -> Self {
972        Self {
973            inner,
974            fanout_enabled,
975            schema,
976            partition_spec,
977            compute_partition,
978        }
979    }
980
981    fn build(self) -> iceberg::Result<TaskWriter<B>> {
982        let partition_splitter = match (
983            self.partition_spec.is_unpartitioned(),
984            self.compute_partition,
985        ) {
986            (true, _) => None,
987            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
988                self.schema.clone(),
989                self.partition_spec.clone(),
990            )?),
991            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
992                self.schema.clone(),
993                self.partition_spec.clone(),
994            )?),
995        };
996
997        Ok(TaskWriter::new_with_partition_splitter(
998            self.inner,
999            self.fanout_enabled,
1000            self.schema,
1001            self.partition_spec,
1002            partition_splitter,
1003        ))
1004    }
1005}
1006
1007pub enum IcebergSinkWriter {
1008    Created(IcebergSinkWriterArgs),
1009    Initialized(IcebergSinkWriterInner),
1010}
1011
1012pub struct IcebergSinkWriterArgs {
1013    config: IcebergConfig,
1014    sink_param: SinkParam,
1015    writer_param: SinkWriterParam,
1016    unique_column_ids: Option<Vec<usize>>,
1017}
1018
1019pub struct IcebergSinkWriterInner {
1020    writer: IcebergWriterDispatch,
1021    arrow_schema: SchemaRef,
1022    // See comments below
1023    metrics: IcebergWriterMetrics,
1024    // State of iceberg table for this writer
1025    table: Table,
1026    // For chunk with extra partition column, we should remove this column before write.
1027    // This project index vec is used to avoid create project idx each time.
1028    project_idx_vec: ProjectIdxVec,
1029}
1030
1031#[allow(clippy::type_complexity)]
1032enum IcebergWriterDispatch {
1033    Append {
1034        writer: Option<Box<dyn IcebergWriter>>,
1035        writer_builder:
1036            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1037    },
1038    Upsert {
1039        writer: Option<Box<dyn IcebergWriter>>,
1040        writer_builder: TaskWriterBuilderWrapper<
1041            MonitoredGeneralWriterBuilder<
1042                DeltaWriterBuilder<
1043                    DataFileWriterBuilderType,
1044                    PositionDeleteFileWriterBuilderType,
1045                    EqualityDeleteFileWriterBuilderType,
1046                >,
1047            >,
1048        >,
1049        arrow_schema_with_op_column: SchemaRef,
1050    },
1051}
1052
1053impl IcebergWriterDispatch {
1054    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1055        match self {
1056            IcebergWriterDispatch::Append { writer, .. }
1057            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1058        }
1059    }
1060}
1061
1062pub struct IcebergWriterMetrics {
1063    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1064    // They are actually used in `PrometheusWriterBuilder`:
1065    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1066    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1067    _write_qps: LabelGuardedIntCounter,
1068    _write_latency: LabelGuardedHistogram,
1069    write_bytes: LabelGuardedIntCounter,
1070}
1071
1072impl IcebergSinkWriter {
1073    pub fn new(
1074        config: IcebergConfig,
1075        sink_param: SinkParam,
1076        writer_param: SinkWriterParam,
1077        unique_column_ids: Option<Vec<usize>>,
1078    ) -> Self {
1079        Self::Created(IcebergSinkWriterArgs {
1080            config,
1081            sink_param,
1082            writer_param,
1083            unique_column_ids,
1084        })
1085    }
1086}
1087
1088impl IcebergSinkWriterInner {
1089    fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1090        let SinkWriterParam {
1091            extra_partition_col_idx,
1092            actor_id,
1093            sink_id,
1094            sink_name,
1095            ..
1096        } = writer_param;
1097        let metrics_labels = [
1098            &actor_id.to_string(),
1099            &sink_id.to_string(),
1100            sink_name.as_str(),
1101        ];
1102
1103        // Metrics
1104        let write_qps = GLOBAL_SINK_METRICS
1105            .iceberg_write_qps
1106            .with_guarded_label_values(&metrics_labels);
1107        let write_latency = GLOBAL_SINK_METRICS
1108            .iceberg_write_latency
1109            .with_guarded_label_values(&metrics_labels);
1110        // # TODO
1111        // Unused. Add this metrics later.
1112        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1113            .iceberg_rolling_unflushed_data_file
1114            .with_guarded_label_values(&metrics_labels);
1115        let write_bytes = GLOBAL_SINK_METRICS
1116            .iceberg_write_bytes
1117            .with_guarded_label_values(&metrics_labels);
1118
1119        let schema = table.metadata().current_schema();
1120        let partition_spec = table.metadata().default_partition_spec();
1121        let fanout_enabled = !partition_spec.fields().is_empty();
1122
1123        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1124        let unique_uuid_suffix = Uuid::now_v7();
1125
1126        let parquet_writer_properties = WriterProperties::builder()
1127            .set_max_row_group_size(
1128                writer_param
1129                    .streaming_config
1130                    .developer
1131                    .iceberg_sink_write_parquet_max_row_group_rows,
1132            )
1133            .build();
1134
1135        let parquet_writer_builder =
1136            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1137        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1138            parquet_writer_builder,
1139            table.file_io().clone(),
1140            DefaultLocationGenerator::new(table.metadata().clone())
1141                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1142            DefaultFileNameGenerator::new(
1143                writer_param.actor_id.to_string(),
1144                Some(unique_uuid_suffix.to_string()),
1145                iceberg::spec::DataFileFormat::Parquet,
1146            ),
1147        );
1148        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1149        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1150            data_file_builder,
1151            write_qps.clone(),
1152            write_latency.clone(),
1153        );
1154        let writer_builder = TaskWriterBuilderWrapper::new(
1155            monitored_builder,
1156            fanout_enabled,
1157            schema.clone(),
1158            partition_spec.clone(),
1159            true,
1160        );
1161        let inner_writer = Some(Box::new(
1162            writer_builder
1163                .clone()
1164                .build()
1165                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1166        ) as Box<dyn IcebergWriter>);
1167        Ok(Self {
1168            arrow_schema: Arc::new(
1169                schema_to_arrow_schema(table.metadata().current_schema())
1170                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1171            ),
1172            metrics: IcebergWriterMetrics {
1173                _write_qps: write_qps,
1174                _write_latency: write_latency,
1175                write_bytes,
1176            },
1177            writer: IcebergWriterDispatch::Append {
1178                writer: inner_writer,
1179                writer_builder,
1180            },
1181            table,
1182            project_idx_vec: {
1183                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1184                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1185                } else {
1186                    ProjectIdxVec::None
1187                }
1188            },
1189        })
1190    }
1191
1192    fn build_upsert(
1193        table: Table,
1194        unique_column_ids: Vec<usize>,
1195        writer_param: &SinkWriterParam,
1196    ) -> Result<Self> {
1197        let SinkWriterParam {
1198            extra_partition_col_idx,
1199            actor_id,
1200            sink_id,
1201            sink_name,
1202            ..
1203        } = writer_param;
1204        let metrics_labels = [
1205            &actor_id.to_string(),
1206            &sink_id.to_string(),
1207            sink_name.as_str(),
1208        ];
1209        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1210
1211        // Metrics
1212        let write_qps = GLOBAL_SINK_METRICS
1213            .iceberg_write_qps
1214            .with_guarded_label_values(&metrics_labels);
1215        let write_latency = GLOBAL_SINK_METRICS
1216            .iceberg_write_latency
1217            .with_guarded_label_values(&metrics_labels);
1218        // # TODO
1219        // Unused. Add this metrics later.
1220        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1221            .iceberg_rolling_unflushed_data_file
1222            .with_guarded_label_values(&metrics_labels);
1223        let write_bytes = GLOBAL_SINK_METRICS
1224            .iceberg_write_bytes
1225            .with_guarded_label_values(&metrics_labels);
1226
1227        // Determine the schema id and partition spec id
1228        let schema = table.metadata().current_schema();
1229        let partition_spec = table.metadata().default_partition_spec();
1230        let fanout_enabled = !partition_spec.fields().is_empty();
1231
1232        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1233        let unique_uuid_suffix = Uuid::now_v7();
1234
1235        let parquet_writer_properties = WriterProperties::builder()
1236            .set_max_row_group_size(
1237                writer_param
1238                    .streaming_config
1239                    .developer
1240                    .iceberg_sink_write_parquet_max_row_group_rows,
1241            )
1242            .build();
1243
1244        let data_file_builder = {
1245            let parquet_writer_builder =
1246                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1247            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1248                parquet_writer_builder,
1249                table.file_io().clone(),
1250                DefaultLocationGenerator::new(table.metadata().clone())
1251                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1252                DefaultFileNameGenerator::new(
1253                    writer_param.actor_id.to_string(),
1254                    Some(unique_uuid_suffix.to_string()),
1255                    iceberg::spec::DataFileFormat::Parquet,
1256                ),
1257            );
1258            DataFileWriterBuilder::new(rolling_writer_builder)
1259        };
1260        let position_delete_builder = {
1261            let parquet_writer_builder = ParquetWriterBuilder::new(
1262                parquet_writer_properties.clone(),
1263                POSITION_DELETE_SCHEMA.clone().into(),
1264            );
1265            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1266                parquet_writer_builder,
1267                table.file_io().clone(),
1268                DefaultLocationGenerator::new(table.metadata().clone())
1269                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1270                DefaultFileNameGenerator::new(
1271                    writer_param.actor_id.to_string(),
1272                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1273                    iceberg::spec::DataFileFormat::Parquet,
1274                ),
1275            );
1276            PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1277        };
1278        let equality_delete_builder = {
1279            let config = EqualityDeleteWriterConfig::new(
1280                unique_column_ids.clone(),
1281                table.metadata().current_schema().clone(),
1282            )
1283            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1284            let parquet_writer_builder = ParquetWriterBuilder::new(
1285                parquet_writer_properties,
1286                Arc::new(
1287                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1288                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1289                ),
1290            );
1291            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1292                parquet_writer_builder,
1293                table.file_io().clone(),
1294                DefaultLocationGenerator::new(table.metadata().clone())
1295                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296                DefaultFileNameGenerator::new(
1297                    writer_param.actor_id.to_string(),
1298                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1299                    iceberg::spec::DataFileFormat::Parquet,
1300                ),
1301            );
1302
1303            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1304        };
1305        let delta_builder = DeltaWriterBuilder::new(
1306            data_file_builder,
1307            position_delete_builder,
1308            equality_delete_builder,
1309            unique_column_ids,
1310            schema.clone(),
1311        );
1312        let original_arrow_schema = Arc::new(
1313            schema_to_arrow_schema(table.metadata().current_schema())
1314                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1315        );
1316        let schema_with_extra_op_column = {
1317            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1318            new_fields.push(Arc::new(ArrowField::new(
1319                "op".to_owned(),
1320                ArrowDataType::Int32,
1321                false,
1322            )));
1323            Arc::new(ArrowSchema::new(new_fields))
1324        };
1325        let writer_builder = TaskWriterBuilderWrapper::new(
1326            MonitoredGeneralWriterBuilder::new(
1327                delta_builder,
1328                write_qps.clone(),
1329                write_latency.clone(),
1330            ),
1331            fanout_enabled,
1332            schema.clone(),
1333            partition_spec.clone(),
1334            true,
1335        );
1336        let inner_writer = Some(Box::new(
1337            writer_builder
1338                .clone()
1339                .build()
1340                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1341        ) as Box<dyn IcebergWriter>);
1342        Ok(Self {
1343            arrow_schema: original_arrow_schema,
1344            metrics: IcebergWriterMetrics {
1345                _write_qps: write_qps,
1346                _write_latency: write_latency,
1347                write_bytes,
1348            },
1349            table,
1350            writer: IcebergWriterDispatch::Upsert {
1351                writer: inner_writer,
1352                writer_builder,
1353                arrow_schema_with_op_column: schema_with_extra_op_column,
1354            },
1355            project_idx_vec: {
1356                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1357                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1358                } else {
1359                    ProjectIdxVec::None
1360                }
1361            },
1362        })
1363    }
1364}
1365
1366#[async_trait]
1367impl SinkWriter for IcebergSinkWriter {
1368    type CommitMetadata = Option<SinkMetadata>;
1369
1370    /// Begin a new epoch
1371    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1372        let Self::Created(args) = self else {
1373            return Ok(());
1374        };
1375
1376        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1377        let inner = match &args.unique_column_ids {
1378            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1379                table,
1380                unique_column_ids.clone(),
1381                &args.writer_param,
1382            )?,
1383            None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1384        };
1385
1386        *self = IcebergSinkWriter::Initialized(inner);
1387        Ok(())
1388    }
1389
1390    /// Write a stream chunk to sink
1391    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1392        let Self::Initialized(inner) = self else {
1393            unreachable!("IcebergSinkWriter should be initialized before barrier");
1394        };
1395
1396        // Try to build writer if it's None.
1397        match &mut inner.writer {
1398            IcebergWriterDispatch::Append {
1399                writer,
1400                writer_builder,
1401            } => {
1402                if writer.is_none() {
1403                    *writer = Some(Box::new(
1404                        writer_builder
1405                            .clone()
1406                            .build()
1407                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1408                    ));
1409                }
1410            }
1411            IcebergWriterDispatch::Upsert {
1412                writer,
1413                writer_builder,
1414                ..
1415            } => {
1416                if writer.is_none() {
1417                    *writer = Some(Box::new(
1418                        writer_builder
1419                            .clone()
1420                            .build()
1421                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1422                    ));
1423                }
1424            }
1425        };
1426
1427        // Process the chunk.
1428        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1429        match &mut inner.project_idx_vec {
1430            ProjectIdxVec::None => {}
1431            ProjectIdxVec::Prepare(idx) => {
1432                if *idx >= chunk.columns().len() {
1433                    return Err(SinkError::Iceberg(anyhow!(
1434                        "invalid extra partition column index {}",
1435                        idx
1436                    )));
1437                }
1438                let project_idx_vec = (0..*idx)
1439                    .chain(*idx + 1..chunk.columns().len())
1440                    .collect_vec();
1441                chunk = chunk.project(&project_idx_vec);
1442                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1443            }
1444            ProjectIdxVec::Done(idx_vec) => {
1445                chunk = chunk.project(idx_vec);
1446            }
1447        }
1448        if ops.is_empty() {
1449            return Ok(());
1450        }
1451        let write_batch_size = chunk.estimated_heap_size();
1452        let batch = match &inner.writer {
1453            IcebergWriterDispatch::Append { .. } => {
1454                // separate out insert chunk
1455                let filters =
1456                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1457                chunk.set_visibility(filters);
1458                IcebergArrowConvert
1459                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1460                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1461            }
1462            IcebergWriterDispatch::Upsert {
1463                arrow_schema_with_op_column,
1464                ..
1465            } => {
1466                let chunk = IcebergArrowConvert
1467                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1468                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1469                let ops = Arc::new(Int32Array::from(
1470                    ops.iter()
1471                        .map(|op| match op {
1472                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1473                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1474                        })
1475                        .collect_vec(),
1476                ));
1477                let mut columns = chunk.columns().to_vec();
1478                columns.push(ops);
1479                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1480                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1481            }
1482        };
1483
1484        let writer = inner.writer.get_writer().unwrap();
1485        writer
1486            .write(batch)
1487            .instrument_await("iceberg_write")
1488            .await
1489            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1490        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1491        Ok(())
1492    }
1493
1494    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1495    /// writer should commit the current epoch.
1496    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1497        let Self::Initialized(inner) = self else {
1498            unreachable!("IcebergSinkWriter should be initialized before barrier");
1499        };
1500
1501        // Skip it if not checkpoint
1502        if !is_checkpoint {
1503            return Ok(None);
1504        }
1505
1506        let close_result = match &mut inner.writer {
1507            IcebergWriterDispatch::Append {
1508                writer,
1509                writer_builder,
1510            } => {
1511                let close_result = match writer.take() {
1512                    Some(mut writer) => {
1513                        Some(writer.close().instrument_await("iceberg_close").await)
1514                    }
1515                    _ => None,
1516                };
1517                match writer_builder.clone().build() {
1518                    Ok(new_writer) => {
1519                        *writer = Some(Box::new(new_writer));
1520                    }
1521                    _ => {
1522                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1523                        // here because current writer may close successfully. So we just log the error.
1524                        warn!("Failed to build new writer after close");
1525                    }
1526                }
1527                close_result
1528            }
1529            IcebergWriterDispatch::Upsert {
1530                writer,
1531                writer_builder,
1532                ..
1533            } => {
1534                let close_result = match writer.take() {
1535                    Some(mut writer) => {
1536                        Some(writer.close().instrument_await("iceberg_close").await)
1537                    }
1538                    _ => None,
1539                };
1540                match writer_builder.clone().build() {
1541                    Ok(new_writer) => {
1542                        *writer = Some(Box::new(new_writer));
1543                    }
1544                    _ => {
1545                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1546                        // here because current writer may close successfully. So we just log the error.
1547                        warn!("Failed to build new writer after close");
1548                    }
1549                }
1550                close_result
1551            }
1552        };
1553
1554        match close_result {
1555            Some(Ok(result)) => {
1556                let format_version = inner.table.metadata().format_version();
1557                let partition_type = inner.table.metadata().default_partition_type();
1558                let data_files = result
1559                    .into_iter()
1560                    .map(|f| {
1561                        SerializedDataFile::try_from(f, partition_type, format_version)
1562                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1563                    })
1564                    .collect::<Result<Vec<_>>>()?;
1565                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1566                    data_files,
1567                    schema_id: inner.table.metadata().current_schema_id(),
1568                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1569                })?))
1570            }
1571            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1572            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1573        }
1574    }
1575}
1576
1577const SCHEMA_ID: &str = "schema_id";
1578const PARTITION_SPEC_ID: &str = "partition_spec_id";
1579const DATA_FILES: &str = "data_files";
1580
1581#[derive(Default, Clone)]
1582struct IcebergCommitResult {
1583    schema_id: i32,
1584    partition_spec_id: i32,
1585    data_files: Vec<SerializedDataFile>,
1586}
1587
1588impl IcebergCommitResult {
1589    fn try_from(value: &SinkMetadata) -> Result<Self> {
1590        if let Some(Serialized(v)) = &value.metadata {
1591            let mut values = if let serde_json::Value::Object(v) =
1592                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1593                    .context("Can't parse iceberg sink metadata")?
1594            {
1595                v
1596            } else {
1597                bail!("iceberg sink metadata should be an object");
1598            };
1599
1600            let schema_id;
1601            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1602                schema_id = value
1603                    .as_u64()
1604                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1605            } else {
1606                bail!("iceberg sink metadata should have schema_id");
1607            }
1608
1609            let partition_spec_id;
1610            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1611                partition_spec_id = value
1612                    .as_u64()
1613                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1614            } else {
1615                bail!("iceberg sink metadata should have partition_spec_id");
1616            }
1617
1618            let data_files: Vec<SerializedDataFile>;
1619            if let serde_json::Value::Array(values) = values
1620                .remove(DATA_FILES)
1621                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1622            {
1623                data_files = values
1624                    .into_iter()
1625                    .map(from_value::<SerializedDataFile>)
1626                    .collect::<std::result::Result<_, _>>()
1627                    .unwrap();
1628            } else {
1629                bail!("iceberg sink metadata should have data_files object");
1630            }
1631
1632            Ok(Self {
1633                schema_id: schema_id as i32,
1634                partition_spec_id: partition_spec_id as i32,
1635                data_files,
1636            })
1637        } else {
1638            bail!("Can't create iceberg sink write result from empty data!")
1639        }
1640    }
1641
1642    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1643        let mut values = if let serde_json::Value::Object(value) =
1644            serde_json::from_slice::<serde_json::Value>(&value)
1645                .context("Can't parse iceberg sink metadata")?
1646        {
1647            value
1648        } else {
1649            bail!("iceberg sink metadata should be an object");
1650        };
1651
1652        let schema_id;
1653        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1654            schema_id = value
1655                .as_u64()
1656                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1657        } else {
1658            bail!("iceberg sink metadata should have schema_id");
1659        }
1660
1661        let partition_spec_id;
1662        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1663            partition_spec_id = value
1664                .as_u64()
1665                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1666        } else {
1667            bail!("iceberg sink metadata should have partition_spec_id");
1668        }
1669
1670        let data_files: Vec<SerializedDataFile>;
1671        if let serde_json::Value::Array(values) = values
1672            .remove(DATA_FILES)
1673            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1674        {
1675            data_files = values
1676                .into_iter()
1677                .map(from_value::<SerializedDataFile>)
1678                .collect::<std::result::Result<_, _>>()
1679                .unwrap();
1680        } else {
1681            bail!("iceberg sink metadata should have data_files object");
1682        }
1683
1684        Ok(Self {
1685            schema_id: schema_id as i32,
1686            partition_spec_id: partition_spec_id as i32,
1687            data_files,
1688        })
1689    }
1690}
1691
1692impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1693    type Error = SinkError;
1694
1695    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1696        let json_data_files = serde_json::Value::Array(
1697            value
1698                .data_files
1699                .iter()
1700                .map(serde_json::to_value)
1701                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1702                .context("Can't serialize data files to json")?,
1703        );
1704        let json_value = serde_json::Value::Object(
1705            vec![
1706                (
1707                    SCHEMA_ID.to_owned(),
1708                    serde_json::Value::Number(value.schema_id.into()),
1709                ),
1710                (
1711                    PARTITION_SPEC_ID.to_owned(),
1712                    serde_json::Value::Number(value.partition_spec_id.into()),
1713                ),
1714                (DATA_FILES.to_owned(), json_data_files),
1715            ]
1716            .into_iter()
1717            .collect(),
1718        );
1719        Ok(SinkMetadata {
1720            metadata: Some(Serialized(SerializedMetadata {
1721                metadata: serde_json::to_vec(&json_value)
1722                    .context("Can't serialize iceberg sink metadata")?,
1723            })),
1724        })
1725    }
1726}
1727
1728impl TryFrom<IcebergCommitResult> for Vec<u8> {
1729    type Error = SinkError;
1730
1731    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1732        let json_data_files = serde_json::Value::Array(
1733            value
1734                .data_files
1735                .iter()
1736                .map(serde_json::to_value)
1737                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1738                .context("Can't serialize data files to json")?,
1739        );
1740        let json_value = serde_json::Value::Object(
1741            vec![
1742                (
1743                    SCHEMA_ID.to_owned(),
1744                    serde_json::Value::Number(value.schema_id.into()),
1745                ),
1746                (
1747                    PARTITION_SPEC_ID.to_owned(),
1748                    serde_json::Value::Number(value.partition_spec_id.into()),
1749                ),
1750                (DATA_FILES.to_owned(), json_data_files),
1751            ]
1752            .into_iter()
1753            .collect(),
1754        );
1755        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1756    }
1757}
1758pub struct IcebergSinkCommitter {
1759    catalog: Arc<dyn Catalog>,
1760    table: Table,
1761    pub last_commit_epoch: u64,
1762    pub(crate) sink_id: SinkId,
1763    pub(crate) config: IcebergConfig,
1764    pub(crate) param: SinkParam,
1765    commit_retry_num: u32,
1766    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1767}
1768
1769impl IcebergSinkCommitter {
1770    // Reload table and guarantee current schema_id and partition_spec_id matches
1771    // given `schema_id` and `partition_spec_id`
1772    async fn reload_table(
1773        catalog: &dyn Catalog,
1774        table_ident: &TableIdent,
1775        schema_id: i32,
1776        partition_spec_id: i32,
1777    ) -> Result<Table> {
1778        let table = catalog
1779            .load_table(table_ident)
1780            .await
1781            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1782        if table.metadata().current_schema_id() != schema_id {
1783            return Err(SinkError::Iceberg(anyhow!(
1784                "Schema evolution not supported, expect schema id {}, but got {}",
1785                schema_id,
1786                table.metadata().current_schema_id()
1787            )));
1788        }
1789        if table.metadata().default_partition_spec_id() != partition_spec_id {
1790            return Err(SinkError::Iceberg(anyhow!(
1791                "Partition evolution not supported, expect partition spec id {}, but got {}",
1792                partition_spec_id,
1793                table.metadata().default_partition_spec_id()
1794            )));
1795        }
1796        Ok(table)
1797    }
1798}
1799
1800#[async_trait]
1801impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1802    async fn init(&mut self) -> Result<()> {
1803        tracing::info!(
1804            "Sink id = {}: iceberg sink coordinator initing.",
1805            self.param.sink_id
1806        );
1807
1808        Ok(())
1809    }
1810
1811    async fn commit(
1812        &mut self,
1813        epoch: u64,
1814        metadata: Vec<SinkMetadata>,
1815        add_columns: Option<Vec<Field>>,
1816    ) -> Result<()> {
1817        tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1818
1819        let (write_results, snapshot_id) =
1820            match self.pre_commit_inner(epoch, metadata, add_columns)? {
1821                Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1822                None => {
1823                    tracing::debug!(?epoch, "no data to commit");
1824                    return Ok(());
1825                }
1826            };
1827
1828        self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1829            .await
1830    }
1831}
1832
1833#[async_trait]
1834impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1835    async fn init(&mut self) -> Result<()> {
1836        tracing::info!(
1837            "Sink id = {}: iceberg sink coordinator initing.",
1838            self.param.sink_id
1839        );
1840
1841        Ok(())
1842    }
1843
1844    async fn pre_commit(
1845        &mut self,
1846        epoch: u64,
1847        metadata: Vec<SinkMetadata>,
1848        add_columns: Option<Vec<Field>>,
1849    ) -> Result<Vec<u8>> {
1850        tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1851
1852        let (write_results, snapshot_id) =
1853            match self.pre_commit_inner(epoch, metadata, add_columns)? {
1854                Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1855                None => {
1856                    tracing::debug!(?epoch, "no data to commit");
1857                    return Ok(vec![]);
1858                }
1859            };
1860
1861        let mut write_results_bytes = Vec::new();
1862        for each_parallelism_write_result in write_results {
1863            let each_parallelism_write_result_bytes: Vec<u8> =
1864                each_parallelism_write_result.try_into()?;
1865            write_results_bytes.push(each_parallelism_write_result_bytes);
1866        }
1867
1868        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1869        write_results_bytes.push(snapshot_id_bytes);
1870
1871        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1872        Ok(pre_commit_metadata_bytes)
1873    }
1874
1875    async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1876        tracing::info!("Starting iceberg commit in epoch {epoch}");
1877        if commit_metadata.is_empty() {
1878            tracing::debug!(?epoch, "no data to commit");
1879            return Ok(());
1880        }
1881
1882        let mut write_results_bytes = deserialize_metadata(commit_metadata);
1883
1884        let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1885        let snapshot_id = i64::from_le_bytes(
1886            snapshot_id_bytes
1887                .try_into()
1888                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1889        );
1890
1891        if self
1892            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1893            .await?
1894        {
1895            tracing::info!(
1896                "Snapshot id {} already committed in iceberg table, skip committing again.",
1897                snapshot_id
1898            );
1899            return Ok(());
1900        }
1901
1902        let mut write_results = vec![];
1903        for each in write_results_bytes {
1904            let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1905            write_results.push(write_result);
1906        }
1907
1908        self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1909            .await?;
1910
1911        Ok(())
1912    }
1913
1914    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1915        // TODO: Files that have been written but not committed should be deleted.
1916        tracing::debug!("Abort not implemented yet");
1917    }
1918}
1919
1920/// Methods Required to Achieve Exactly Once Semantics
1921impl IcebergSinkCommitter {
1922    fn pre_commit_inner(
1923        &mut self,
1924        _epoch: u64,
1925        metadata: Vec<SinkMetadata>,
1926        add_columns: Option<Vec<Field>>,
1927    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
1928        if let Some(add_columns) = add_columns {
1929            return Err(anyhow!(
1930                "Iceberg sink not support add columns, but got: {:?}",
1931                add_columns
1932            )
1933            .into());
1934        }
1935
1936        let write_results: Vec<IcebergCommitResult> = metadata
1937            .iter()
1938            .map(IcebergCommitResult::try_from)
1939            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1940
1941        // Skip if no data to commit
1942        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1943            return Ok(None);
1944        }
1945
1946        let expect_schema_id = write_results[0].schema_id;
1947        let expect_partition_spec_id = write_results[0].partition_spec_id;
1948
1949        // guarantee that all write results has same schema_id and partition_spec_id
1950        if write_results
1951            .iter()
1952            .any(|r| r.schema_id != expect_schema_id)
1953            || write_results
1954                .iter()
1955                .any(|r| r.partition_spec_id != expect_partition_spec_id)
1956        {
1957            return Err(SinkError::Iceberg(anyhow!(
1958                "schema_id and partition_spec_id should be the same in all write results"
1959            )));
1960        }
1961
1962        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
1963
1964        Ok(Some((write_results, snapshot_id)))
1965    }
1966
1967    async fn commit_iceberg_inner(
1968        &mut self,
1969        epoch: u64,
1970        write_results: Vec<IcebergCommitResult>,
1971        snapshot_id: i64,
1972    ) -> Result<()> {
1973        // Empty write results should be handled before calling this function.
1974        assert!(
1975            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
1976        );
1977
1978        // Check snapshot limit before proceeding with commit
1979        self.wait_for_snapshot_limit().await?;
1980
1981        let expect_schema_id = write_results[0].schema_id;
1982        let expect_partition_spec_id = write_results[0].partition_spec_id;
1983
1984        // Load the latest table to avoid concurrent modification with the best effort.
1985        self.table = Self::reload_table(
1986            self.catalog.as_ref(),
1987            self.table.identifier(),
1988            expect_schema_id,
1989            expect_partition_spec_id,
1990        )
1991        .await?;
1992
1993        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1994            return Err(SinkError::Iceberg(anyhow!(
1995                "Can't find schema by id {}",
1996                expect_schema_id
1997            )));
1998        };
1999        let Some(partition_spec) = self
2000            .table
2001            .metadata()
2002            .partition_spec_by_id(expect_partition_spec_id)
2003        else {
2004            return Err(SinkError::Iceberg(anyhow!(
2005                "Can't find partition spec by id {}",
2006                expect_partition_spec_id
2007            )));
2008        };
2009        let partition_type = partition_spec
2010            .as_ref()
2011            .clone()
2012            .partition_type(schema)
2013            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2014
2015        let data_files = write_results
2016            .into_iter()
2017            .flat_map(|r| {
2018                r.data_files.into_iter().map(|f| {
2019                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2020                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2021                })
2022            })
2023            .collect::<Result<Vec<DataFile>>>()?;
2024        // # TODO:
2025        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2026        // because retry logic involved reapply the commit metadata.
2027        // For now, we just retry the commit operation.
2028        let retry_strategy = ExponentialBackoff::from_millis(10)
2029            .max_delay(Duration::from_secs(60))
2030            .map(jitter)
2031            .take(self.commit_retry_num as usize);
2032        let catalog = self.catalog.clone();
2033        let table_ident = self.table.identifier().clone();
2034        let table = Retry::spawn(retry_strategy, || async {
2035            let table = Self::reload_table(
2036                catalog.as_ref(),
2037                &table_ident,
2038                expect_schema_id,
2039                expect_partition_spec_id,
2040            )
2041            .await?;
2042            let txn = Transaction::new(&table);
2043            let append_action = txn
2044                .fast_append()
2045                .set_snapshot_id(snapshot_id)
2046                .set_target_branch(commit_branch(
2047                    self.config.r#type.as_str(),
2048                    self.config.write_mode,
2049                ))
2050                .add_data_files(data_files.clone());
2051
2052            let tx = append_action.apply(txn).map_err(|err| {
2053                let err: IcebergError = err.into();
2054                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2055                SinkError::Iceberg(anyhow!(err))
2056            })?;
2057            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2058                let err: IcebergError = err.into();
2059                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2060                SinkError::Iceberg(anyhow!(err))
2061            })
2062        })
2063        .await?;
2064        self.table = table;
2065
2066        let snapshot_num = self.table.metadata().snapshots().count();
2067        let catalog_name = self.config.common.catalog_name();
2068        let table_name = self.table.identifier().to_string();
2069        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2070        GLOBAL_SINK_METRICS
2071            .iceberg_snapshot_num
2072            .with_guarded_label_values(&metrics_labels)
2073            .set(snapshot_num as i64);
2074
2075        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2076
2077        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2078            && self.config.enable_compaction
2079            && iceberg_compact_stat_sender
2080                .send(IcebergSinkCompactionUpdate {
2081                    sink_id: self.sink_id,
2082                    compaction_interval: self.config.compaction_interval_sec(),
2083                    force_compaction: false,
2084                })
2085                .is_err()
2086        {
2087            warn!("failed to send iceberg compaction stats");
2088        }
2089
2090        Ok(())
2091    }
2092
2093    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2094    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2095    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2096    async fn is_snapshot_id_in_iceberg(
2097        &self,
2098        iceberg_config: &IcebergConfig,
2099        snapshot_id: i64,
2100    ) -> Result<bool> {
2101        let table = iceberg_config.load_table().await?;
2102        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2103            Ok(true)
2104        } else {
2105            Ok(false)
2106        }
2107    }
2108
2109    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2110    /// Returns the number of snapshots since the last rewrite/overwrite
2111    fn count_snapshots_since_rewrite(&self) -> usize {
2112        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2113        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2114
2115        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2116        let mut count = 0;
2117        for snapshot in snapshots {
2118            // Check if this snapshot represents a rewrite or overwrite operation
2119            let summary = snapshot.summary();
2120            match &summary.operation {
2121                Operation::Replace => {
2122                    // Found a rewrite/overwrite operation, stop counting
2123                    break;
2124                }
2125
2126                _ => {
2127                    // Increment count for each snapshot that is not a rewrite/overwrite
2128                    count += 1;
2129                }
2130            }
2131        }
2132
2133        count
2134    }
2135
2136    /// Wait until snapshot count since last rewrite is below the limit
2137    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2138        if !self.config.enable_compaction {
2139            return Ok(());
2140        }
2141
2142        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2143            loop {
2144                let current_count = self.count_snapshots_since_rewrite();
2145
2146                if current_count < max_snapshots {
2147                    tracing::info!(
2148                        "Snapshot count check passed: {} < {}",
2149                        current_count,
2150                        max_snapshots
2151                    );
2152                    break;
2153                }
2154
2155                tracing::info!(
2156                    "Snapshot count {} exceeds limit {}, waiting...",
2157                    current_count,
2158                    max_snapshots
2159                );
2160
2161                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2162                    && iceberg_compact_stat_sender
2163                        .send(IcebergSinkCompactionUpdate {
2164                            sink_id: self.sink_id,
2165                            compaction_interval: self.config.compaction_interval_sec(),
2166                            force_compaction: true,
2167                        })
2168                        .is_err()
2169                {
2170                    tracing::warn!("failed to send iceberg compaction stats");
2171                }
2172
2173                // Wait for 30 seconds before checking again
2174                tokio::time::sleep(Duration::from_secs(30)).await;
2175
2176                // Reload table to get latest snapshots
2177                self.table = self.config.load_table().await?;
2178            }
2179        }
2180        Ok(())
2181    }
2182}
2183
2184const MAP_KEY: &str = "key";
2185const MAP_VALUE: &str = "value";
2186
2187fn get_fields<'a>(
2188    our_field_type: &'a risingwave_common::types::DataType,
2189    data_type: &ArrowDataType,
2190    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2191) -> Option<ArrowFields> {
2192    match data_type {
2193        ArrowDataType::Struct(fields) => {
2194            match our_field_type {
2195                risingwave_common::types::DataType::Struct(struct_fields) => {
2196                    struct_fields.iter().for_each(|(name, data_type)| {
2197                        let res = schema_fields.insert(name, data_type);
2198                        // This assert is to make sure there is no duplicate field name in the schema.
2199                        assert!(res.is_none())
2200                    });
2201                }
2202                risingwave_common::types::DataType::Map(map_fields) => {
2203                    schema_fields.insert(MAP_KEY, map_fields.key());
2204                    schema_fields.insert(MAP_VALUE, map_fields.value());
2205                }
2206                risingwave_common::types::DataType::List(list) => {
2207                    list.elem()
2208                        .as_struct()
2209                        .iter()
2210                        .for_each(|(name, data_type)| {
2211                            let res = schema_fields.insert(name, data_type);
2212                            // This assert is to make sure there is no duplicate field name in the schema.
2213                            assert!(res.is_none())
2214                        });
2215                }
2216                _ => {}
2217            };
2218            Some(fields.clone())
2219        }
2220        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2221            get_fields(our_field_type, field.data_type(), schema_fields)
2222        }
2223        _ => None, // not a supported complex type and unlikely to show up
2224    }
2225}
2226
2227fn check_compatibility(
2228    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2229    fields: &ArrowFields,
2230) -> anyhow::Result<bool> {
2231    for arrow_field in fields {
2232        let our_field_type = schema_fields
2233            .get(arrow_field.name().as_str())
2234            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2235
2236        // Iceberg source should be able to read iceberg decimal type.
2237        let converted_arrow_data_type = IcebergArrowConvert
2238            .to_arrow_field("", our_field_type)
2239            .map_err(|e| anyhow!(e))?
2240            .data_type()
2241            .clone();
2242
2243        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2244            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2245            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2246            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2247            (ArrowDataType::List(_), ArrowDataType::List(field))
2248            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2249                let mut schema_fields = HashMap::new();
2250                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2251                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2252            }
2253            // validate nested structs
2254            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2255                let mut schema_fields = HashMap::new();
2256                our_field_type
2257                    .as_struct()
2258                    .iter()
2259                    .for_each(|(name, data_type)| {
2260                        let res = schema_fields.insert(name, data_type);
2261                        // This assert is to make sure there is no duplicate field name in the schema.
2262                        assert!(res.is_none())
2263                    });
2264                check_compatibility(schema_fields, fields)?
2265            }
2266            // cases where left != right (metadata, field name mismatch)
2267            //
2268            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2269            // {"PARQUET:field_id": ".."}
2270            //
2271            // map: The standard name in arrow is "entries", "key", "value".
2272            // in iceberg-rs, it's called "key_value"
2273            (left, right) => left.equals_datatype(right),
2274        };
2275        if !compatible {
2276            bail!(
2277                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2278                arrow_field.name(),
2279                converted_arrow_data_type,
2280                arrow_field.data_type()
2281            );
2282        }
2283    }
2284    Ok(true)
2285}
2286
2287/// Try to match our schema with iceberg schema.
2288pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2289    if rw_schema.fields.len() != arrow_schema.fields().len() {
2290        bail!(
2291            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2292            rw_schema.fields.len(),
2293            arrow_schema.fields.len()
2294        );
2295    }
2296
2297    let mut schema_fields = HashMap::new();
2298    rw_schema.fields.iter().for_each(|field| {
2299        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2300        // This assert is to make sure there is no duplicate field name in the schema.
2301        assert!(res.is_none())
2302    });
2303
2304    check_compatibility(schema_fields, &arrow_schema.fields)?;
2305    Ok(())
2306}
2307
2308pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2309    serde_json::to_vec(&metadata).unwrap()
2310}
2311
2312pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2313    serde_json::from_slice(&bytes).unwrap()
2314}
2315
2316pub fn parse_partition_by_exprs(
2317    expr: String,
2318) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2319    // captures column, transform(column), transform(n,column), transform(n, column)
2320    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2321    if !re.is_match(&expr) {
2322        bail!(format!(
2323            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2324            expr
2325        ))
2326    }
2327    let caps = re.captures_iter(&expr);
2328
2329    let mut partition_columns = vec![];
2330
2331    for mat in caps {
2332        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2333            (&mat["transform"], Transform::Identity)
2334        } else {
2335            let mut func = mat["transform"].to_owned();
2336            if func == "bucket" || func == "truncate" {
2337                let n = &mat
2338                    .name("n")
2339                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2340                    .as_str();
2341                func = format!("{func}[{n}]");
2342            }
2343            (
2344                &mat["field"],
2345                Transform::from_str(&func)
2346                    .with_context(|| format!("invalid transform function {}", func))?,
2347            )
2348        };
2349        partition_columns.push((column.to_owned(), transform));
2350    }
2351    Ok(partition_columns)
2352}
2353
2354pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2355    if should_enable_iceberg_cow(sink_type, write_mode) {
2356        ICEBERG_COW_BRANCH.to_owned()
2357    } else {
2358        MAIN_BRANCH.to_owned()
2359    }
2360}
2361
2362pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2363    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2364}
2365
2366impl crate::with_options::WithOptions for IcebergWriteMode {}
2367
2368impl crate::with_options::WithOptions for CompactionType {}
2369
2370#[cfg(test)]
2371mod test {
2372    use std::collections::BTreeMap;
2373
2374    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2375    use risingwave_common::types::{DataType, MapType, StructType};
2376
2377    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2378    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2379    use crate::sink::iceberg::{
2380        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2381        ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2382        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2383        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2384    };
2385
2386    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2387
2388    #[test]
2389    fn test_compatible_arrow_schema() {
2390        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2391
2392        use super::*;
2393        let risingwave_schema = Schema::new(vec![
2394            Field::with_name(DataType::Int32, "a"),
2395            Field::with_name(DataType::Int32, "b"),
2396            Field::with_name(DataType::Int32, "c"),
2397        ]);
2398        let arrow_schema = ArrowSchema::new(vec![
2399            ArrowField::new("a", ArrowDataType::Int32, false),
2400            ArrowField::new("b", ArrowDataType::Int32, false),
2401            ArrowField::new("c", ArrowDataType::Int32, false),
2402        ]);
2403
2404        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2405
2406        let risingwave_schema = Schema::new(vec![
2407            Field::with_name(DataType::Int32, "d"),
2408            Field::with_name(DataType::Int32, "c"),
2409            Field::with_name(DataType::Int32, "a"),
2410            Field::with_name(DataType::Int32, "b"),
2411        ]);
2412        let arrow_schema = ArrowSchema::new(vec![
2413            ArrowField::new("a", ArrowDataType::Int32, false),
2414            ArrowField::new("b", ArrowDataType::Int32, false),
2415            ArrowField::new("d", ArrowDataType::Int32, false),
2416            ArrowField::new("c", ArrowDataType::Int32, false),
2417        ]);
2418        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2419
2420        let risingwave_schema = Schema::new(vec![
2421            Field::with_name(
2422                DataType::Struct(StructType::new(vec![
2423                    ("a1", DataType::Int32),
2424                    (
2425                        "a2",
2426                        DataType::Struct(StructType::new(vec![
2427                            ("a21", DataType::Bytea),
2428                            (
2429                                "a22",
2430                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2431                            ),
2432                        ])),
2433                    ),
2434                ])),
2435                "a",
2436            ),
2437            Field::with_name(
2438                DataType::list(DataType::Struct(StructType::new(vec![
2439                    ("b1", DataType::Int32),
2440                    ("b2", DataType::Bytea),
2441                    (
2442                        "b3",
2443                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2444                    ),
2445                ]))),
2446                "b",
2447            ),
2448            Field::with_name(
2449                DataType::Map(MapType::from_kv(
2450                    DataType::Varchar,
2451                    DataType::list(DataType::Struct(StructType::new([
2452                        ("c1", DataType::Int32),
2453                        ("c2", DataType::Bytea),
2454                        (
2455                            "c3",
2456                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2457                        ),
2458                    ]))),
2459                )),
2460                "c",
2461            ),
2462        ]);
2463        let arrow_schema = ArrowSchema::new(vec![
2464            ArrowField::new(
2465                "a",
2466                ArrowDataType::Struct(ArrowFields::from(vec![
2467                    ArrowField::new("a1", ArrowDataType::Int32, false),
2468                    ArrowField::new(
2469                        "a2",
2470                        ArrowDataType::Struct(ArrowFields::from(vec![
2471                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2472                            ArrowField::new_map(
2473                                "a22",
2474                                "entries",
2475                                ArrowFieldRef::new(ArrowField::new(
2476                                    "key",
2477                                    ArrowDataType::Utf8,
2478                                    false,
2479                                )),
2480                                ArrowFieldRef::new(ArrowField::new(
2481                                    "value",
2482                                    ArrowDataType::Utf8,
2483                                    false,
2484                                )),
2485                                false,
2486                                false,
2487                            ),
2488                        ])),
2489                        false,
2490                    ),
2491                ])),
2492                false,
2493            ),
2494            ArrowField::new(
2495                "b",
2496                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2497                    ArrowDataType::Struct(ArrowFields::from(vec![
2498                        ArrowField::new("b1", ArrowDataType::Int32, false),
2499                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2500                        ArrowField::new_map(
2501                            "b3",
2502                            "entries",
2503                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2504                            ArrowFieldRef::new(ArrowField::new(
2505                                "value",
2506                                ArrowDataType::Utf8,
2507                                false,
2508                            )),
2509                            false,
2510                            false,
2511                        ),
2512                    ])),
2513                    false,
2514                ))),
2515                false,
2516            ),
2517            ArrowField::new_map(
2518                "c",
2519                "entries",
2520                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2521                ArrowFieldRef::new(ArrowField::new(
2522                    "value",
2523                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2524                        ArrowDataType::Struct(ArrowFields::from(vec![
2525                            ArrowField::new("c1", ArrowDataType::Int32, false),
2526                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2527                            ArrowField::new_map(
2528                                "c3",
2529                                "entries",
2530                                ArrowFieldRef::new(ArrowField::new(
2531                                    "key",
2532                                    ArrowDataType::Utf8,
2533                                    false,
2534                                )),
2535                                ArrowFieldRef::new(ArrowField::new(
2536                                    "value",
2537                                    ArrowDataType::Utf8,
2538                                    false,
2539                                )),
2540                                false,
2541                                false,
2542                            ),
2543                        ])),
2544                        false,
2545                    ))),
2546                    false,
2547                )),
2548                false,
2549                false,
2550            ),
2551        ]);
2552        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2553    }
2554
2555    #[test]
2556    fn test_parse_iceberg_config() {
2557        let values = [
2558            ("connector", "iceberg"),
2559            ("type", "upsert"),
2560            ("primary_key", "v1"),
2561            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2562            ("warehouse.path", "s3://iceberg"),
2563            ("s3.endpoint", "http://127.0.0.1:9301"),
2564            ("s3.access.key", "hummockadmin"),
2565            ("s3.secret.key", "hummockadmin"),
2566            ("s3.path.style.access", "true"),
2567            ("s3.region", "us-east-1"),
2568            ("catalog.type", "jdbc"),
2569            ("catalog.name", "demo"),
2570            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2571            ("catalog.jdbc.user", "admin"),
2572            ("catalog.jdbc.password", "123456"),
2573            ("database.name", "demo_db"),
2574            ("table.name", "demo_table"),
2575            ("enable_compaction", "true"),
2576            ("compaction_interval_sec", "1800"),
2577            ("enable_snapshot_expiration", "true"),
2578        ]
2579        .into_iter()
2580        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2581        .collect();
2582
2583        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2584
2585        let expected_iceberg_config = IcebergConfig {
2586            common: IcebergCommon {
2587                warehouse_path: Some("s3://iceberg".to_owned()),
2588                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2589                s3_region: Some("us-east-1".to_owned()),
2590                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2591                s3_access_key: Some("hummockadmin".to_owned()),
2592                s3_secret_key: Some("hummockadmin".to_owned()),
2593                s3_iam_role_arn: None,
2594                gcs_credential: None,
2595                catalog_type: Some("jdbc".to_owned()),
2596                glue_id: None,
2597                glue_region: None,
2598                glue_access_key: None,
2599                glue_secret_key: None,
2600                glue_iam_role_arn: None,
2601                catalog_name: Some("demo".to_owned()),
2602                s3_path_style_access: Some(true),
2603                catalog_credential: None,
2604                catalog_oauth2_server_uri: None,
2605                catalog_scope: None,
2606                catalog_token: None,
2607                enable_config_load: None,
2608                rest_signing_name: None,
2609                rest_signing_region: None,
2610                rest_sigv4_enabled: None,
2611                hosted_catalog: None,
2612                azblob_account_name: None,
2613                azblob_account_key: None,
2614                azblob_endpoint_url: None,
2615                catalog_header: None,
2616                adlsgen2_account_name: None,
2617                adlsgen2_account_key: None,
2618                adlsgen2_endpoint: None,
2619                vended_credentials: None,
2620            },
2621            table: IcebergTableIdentifier {
2622                database_name: Some("demo_db".to_owned()),
2623                table_name: "demo_table".to_owned(),
2624            },
2625            r#type: "upsert".to_owned(),
2626            force_append_only: false,
2627            primary_key: Some(vec!["v1".to_owned()]),
2628            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2629            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2630                .into_iter()
2631                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2632                .collect(),
2633            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2634            create_table_if_not_exists: false,
2635            is_exactly_once: Some(true),
2636            commit_retry_num: 8,
2637            enable_compaction: true,
2638            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2639            enable_snapshot_expiration: true,
2640            write_mode: IcebergWriteMode::MergeOnRead,
2641            snapshot_expiration_max_age_millis: None,
2642            snapshot_expiration_retain_last: None,
2643            snapshot_expiration_clear_expired_files: true,
2644            snapshot_expiration_clear_expired_meta_data: true,
2645            max_snapshots_num_before_compaction: None,
2646            small_files_threshold_mb: None,
2647            delete_files_count_threshold: None,
2648            trigger_snapshot_count: None,
2649            target_file_size_mb: None,
2650            compaction_type: None,
2651        };
2652
2653        assert_eq!(iceberg_config, expected_iceberg_config);
2654
2655        assert_eq!(
2656            &iceberg_config.full_table_name().unwrap().to_string(),
2657            "demo_db.demo_table"
2658        );
2659    }
2660
2661    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2662        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2663
2664        let _table = iceberg_config.load_table().await.unwrap();
2665    }
2666
2667    #[tokio::test]
2668    #[ignore]
2669    async fn test_storage_catalog() {
2670        let values = [
2671            ("connector", "iceberg"),
2672            ("type", "append-only"),
2673            ("force_append_only", "true"),
2674            ("s3.endpoint", "http://127.0.0.1:9301"),
2675            ("s3.access.key", "hummockadmin"),
2676            ("s3.secret.key", "hummockadmin"),
2677            ("s3.region", "us-east-1"),
2678            ("s3.path.style.access", "true"),
2679            ("catalog.name", "demo"),
2680            ("catalog.type", "storage"),
2681            ("warehouse.path", "s3://icebergdata/demo"),
2682            ("database.name", "s1"),
2683            ("table.name", "t1"),
2684        ]
2685        .into_iter()
2686        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2687        .collect();
2688
2689        test_create_catalog(values).await;
2690    }
2691
2692    #[tokio::test]
2693    #[ignore]
2694    async fn test_rest_catalog() {
2695        let values = [
2696            ("connector", "iceberg"),
2697            ("type", "append-only"),
2698            ("force_append_only", "true"),
2699            ("s3.endpoint", "http://127.0.0.1:9301"),
2700            ("s3.access.key", "hummockadmin"),
2701            ("s3.secret.key", "hummockadmin"),
2702            ("s3.region", "us-east-1"),
2703            ("s3.path.style.access", "true"),
2704            ("catalog.name", "demo"),
2705            ("catalog.type", "rest"),
2706            ("catalog.uri", "http://192.168.167.4:8181"),
2707            ("warehouse.path", "s3://icebergdata/demo"),
2708            ("database.name", "s1"),
2709            ("table.name", "t1"),
2710        ]
2711        .into_iter()
2712        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2713        .collect();
2714
2715        test_create_catalog(values).await;
2716    }
2717
2718    #[tokio::test]
2719    #[ignore]
2720    async fn test_jdbc_catalog() {
2721        let values = [
2722            ("connector", "iceberg"),
2723            ("type", "append-only"),
2724            ("force_append_only", "true"),
2725            ("s3.endpoint", "http://127.0.0.1:9301"),
2726            ("s3.access.key", "hummockadmin"),
2727            ("s3.secret.key", "hummockadmin"),
2728            ("s3.region", "us-east-1"),
2729            ("s3.path.style.access", "true"),
2730            ("catalog.name", "demo"),
2731            ("catalog.type", "jdbc"),
2732            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2733            ("catalog.jdbc.user", "admin"),
2734            ("catalog.jdbc.password", "123456"),
2735            ("warehouse.path", "s3://icebergdata/demo"),
2736            ("database.name", "s1"),
2737            ("table.name", "t1"),
2738        ]
2739        .into_iter()
2740        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2741        .collect();
2742
2743        test_create_catalog(values).await;
2744    }
2745
2746    #[tokio::test]
2747    #[ignore]
2748    async fn test_hive_catalog() {
2749        let values = [
2750            ("connector", "iceberg"),
2751            ("type", "append-only"),
2752            ("force_append_only", "true"),
2753            ("s3.endpoint", "http://127.0.0.1:9301"),
2754            ("s3.access.key", "hummockadmin"),
2755            ("s3.secret.key", "hummockadmin"),
2756            ("s3.region", "us-east-1"),
2757            ("s3.path.style.access", "true"),
2758            ("catalog.name", "demo"),
2759            ("catalog.type", "hive"),
2760            ("catalog.uri", "thrift://localhost:9083"),
2761            ("warehouse.path", "s3://icebergdata/demo"),
2762            ("database.name", "s1"),
2763            ("table.name", "t1"),
2764        ]
2765        .into_iter()
2766        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2767        .collect();
2768
2769        test_create_catalog(values).await;
2770    }
2771
2772    #[test]
2773    fn test_config_constants_consistency() {
2774        // This test ensures our constants match the expected configuration names
2775        // If you change a constant, this test will remind you to update both places
2776        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2777        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2778        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2779        assert_eq!(WRITE_MODE, "write_mode");
2780        assert_eq!(
2781            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2782            "snapshot_expiration_retain_last"
2783        );
2784        assert_eq!(
2785            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2786            "snapshot_expiration_max_age_millis"
2787        );
2788        assert_eq!(
2789            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2790            "snapshot_expiration_clear_expired_files"
2791        );
2792        assert_eq!(
2793            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2794            "snapshot_expiration_clear_expired_meta_data"
2795        );
2796        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2797    }
2798
2799    #[test]
2800    fn test_parse_iceberg_compaction_config() {
2801        // Test parsing with new compaction.* prefix config names
2802        let values = [
2803            ("connector", "iceberg"),
2804            ("type", "upsert"),
2805            ("primary_key", "id"),
2806            ("warehouse.path", "s3://iceberg"),
2807            ("s3.endpoint", "http://127.0.0.1:9301"),
2808            ("s3.access.key", "test"),
2809            ("s3.secret.key", "test"),
2810            ("s3.region", "us-east-1"),
2811            ("catalog.type", "storage"),
2812            ("catalog.name", "demo"),
2813            ("database.name", "test_db"),
2814            ("table.name", "test_table"),
2815            ("enable_compaction", "true"),
2816            ("compaction.max_snapshots_num", "100"),
2817            ("compaction.small_files_threshold_mb", "512"),
2818            ("compaction.delete_files_count_threshold", "50"),
2819            ("compaction.trigger_snapshot_count", "10"),
2820            ("compaction.target_file_size_mb", "256"),
2821            ("compaction.type", "full"),
2822        ]
2823        .into_iter()
2824        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2825        .collect();
2826
2827        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2828
2829        // Verify all compaction config fields are parsed correctly
2830        assert!(iceberg_config.enable_compaction);
2831        assert_eq!(
2832            iceberg_config.max_snapshots_num_before_compaction,
2833            Some(100)
2834        );
2835        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2836        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2837        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2838        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2839        assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
2840    }
2841}