risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31    SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32    UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38    DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45    PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50    DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64    Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94    SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103    TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121    #[default]
122    MergeOnRead,
123    CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127    pub fn as_str(self) -> &'static str {
128        match self {
129            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131        }
132    }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136    type Err = SinkError;
137
138    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139        match s {
140            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142            _ => Err(SinkError::Config(anyhow!(format!(
143                "invalid write_mode: {}, must be one of: {}, {}",
144                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145            )))),
146        }
147    }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151    type Error = <Self as std::str::FromStr>::Err;
152
153    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154        value.parse()
155    }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159    type Error = <Self as std::str::FromStr>::Err;
160
161    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162        value.as_str().parse()
163    }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.write_str(self.as_str())
169    }
170}
171
172// Configuration constants
173pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182    "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197    "compaction.write_parquet_max_row_group_rows";
198
199const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
200
201fn default_commit_retry_num() -> u32 {
202    8
203}
204
205fn default_iceberg_write_mode() -> IcebergWriteMode {
206    IcebergWriteMode::MergeOnRead
207}
208
209fn default_iceberg_format_version() -> FormatVersion {
210    FormatVersion::V2
211}
212
213fn default_true() -> bool {
214    true
215}
216
217fn default_some_true() -> Option<bool> {
218    Some(true)
219}
220
221fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
222    let parsed = value
223        .trim()
224        .parse::<u8>()
225        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
226    match parsed {
227        1 => Ok(FormatVersion::V1),
228        2 => Ok(FormatVersion::V2),
229        3 => Ok(FormatVersion::V3),
230        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
231    }
232}
233
234fn deserialize_format_version<'de, D>(
235    deserializer: D,
236) -> std::result::Result<FormatVersion, D::Error>
237where
238    D: Deserializer<'de>,
239{
240    struct FormatVersionVisitor;
241
242    impl<'de> Visitor<'de> for FormatVersionVisitor {
243        type Value = FormatVersion;
244
245        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246            formatter.write_str("format-version as 1, 2, or 3")
247        }
248
249        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
250        where
251            E: de::Error,
252        {
253            let value = u8::try_from(value)
254                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
255            parse_format_version_str(&value.to_string()).map_err(E::custom)
256        }
257
258        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
259        where
260            E: de::Error,
261        {
262            let value = u8::try_from(value)
263                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
264            parse_format_version_str(&value.to_string()).map_err(E::custom)
265        }
266
267        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
268        where
269            E: de::Error,
270        {
271            parse_format_version_str(value).map_err(E::custom)
272        }
273
274        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
275        where
276            E: de::Error,
277        {
278            self.visit_str(&value)
279        }
280    }
281
282    deserializer.deserialize_any(FormatVersionVisitor)
283}
284
285/// Compaction type for Iceberg sink
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
287#[serde(rename_all = "kebab-case")]
288pub enum CompactionType {
289    /// Full compaction - rewrites all data files
290    #[default]
291    Full,
292    /// Small files compaction - only compact small files
293    SmallFiles,
294    /// Files with delete compaction - only compact files that have associated delete files
295    FilesWithDelete,
296}
297
298impl CompactionType {
299    pub fn as_str(&self) -> &'static str {
300        match self {
301            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
302            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
303            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
304        }
305    }
306}
307
308impl std::str::FromStr for CompactionType {
309    type Err = SinkError;
310
311    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
312        match s {
313            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
314            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
315            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
316            _ => Err(SinkError::Config(anyhow!(format!(
317                "invalid compaction_type: {}, must be one of: {}, {}, {}",
318                s,
319                ICEBERG_COMPACTION_TYPE_FULL,
320                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
321                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
322            )))),
323        }
324    }
325}
326
327impl TryFrom<&str> for CompactionType {
328    type Error = <Self as std::str::FromStr>::Err;
329
330    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
331        value.parse()
332    }
333}
334
335impl TryFrom<String> for CompactionType {
336    type Error = <Self as std::str::FromStr>::Err;
337
338    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
339        value.as_str().parse()
340    }
341}
342
343impl std::fmt::Display for CompactionType {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        write!(f, "{}", self.as_str())
346    }
347}
348
349#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
351pub struct IcebergConfig {
352    pub r#type: String, // accept "append-only" or "upsert"
353
354    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
355    pub force_append_only: bool,
356
357    #[serde(flatten)]
358    common: IcebergCommon,
359
360    #[serde(flatten)]
361    table: IcebergTableIdentifier,
362
363    #[serde(
364        rename = "primary_key",
365        default,
366        deserialize_with = "deserialize_optional_string_seq_from_string"
367    )]
368    pub primary_key: Option<Vec<String>>,
369
370    // Props for java catalog props.
371    #[serde(skip)]
372    pub java_catalog_props: HashMap<String, String>,
373
374    #[serde(default)]
375    pub partition_by: Option<String>,
376
377    /// Commit every n(>0) checkpoints, default is 60.
378    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
379    #[serde_as(as = "DisplayFromStr")]
380    #[with_option(allow_alter_on_fly)]
381    pub commit_checkpoint_interval: u64,
382
383    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
384    pub create_table_if_not_exists: bool,
385
386    /// Whether it is `exactly_once`, the default is true.
387    #[serde(default = "default_some_true")]
388    #[serde_as(as = "Option<DisplayFromStr>")]
389    pub is_exactly_once: Option<bool>,
390    // Retry commit num when iceberg commit fail. default is 8.
391    // # TODO
392    // Iceberg table may store the retry commit num in table meta.
393    // We should try to find and use that as default commit retry num first.
394    #[serde(default = "default_commit_retry_num")]
395    pub commit_retry_num: u32,
396
397    /// Whether to enable iceberg compaction.
398    #[serde(
399        rename = "enable_compaction",
400        default,
401        deserialize_with = "deserialize_bool_from_string"
402    )]
403    #[with_option(allow_alter_on_fly)]
404    pub enable_compaction: bool,
405
406    /// The interval of iceberg compaction
407    #[serde(rename = "compaction_interval_sec", default)]
408    #[serde_as(as = "Option<DisplayFromStr>")]
409    #[with_option(allow_alter_on_fly)]
410    pub compaction_interval_sec: Option<u64>,
411
412    /// Whether to enable iceberg expired snapshots.
413    #[serde(
414        rename = "enable_snapshot_expiration",
415        default,
416        deserialize_with = "deserialize_bool_from_string"
417    )]
418    #[with_option(allow_alter_on_fly)]
419    pub enable_snapshot_expiration: bool,
420
421    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
422    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
423    pub write_mode: IcebergWriteMode,
424
425    /// Iceberg format version for table creation.
426    #[serde(
427        rename = "format_version",
428        default = "default_iceberg_format_version",
429        deserialize_with = "deserialize_format_version"
430    )]
431    pub format_version: FormatVersion,
432
433    /// The maximum age (in milliseconds) for snapshots before they expire
434    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
435    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
436    #[serde_as(as = "Option<DisplayFromStr>")]
437    #[with_option(allow_alter_on_fly)]
438    pub snapshot_expiration_max_age_millis: Option<i64>,
439
440    /// The number of snapshots to retain
441    #[serde(rename = "snapshot_expiration_retain_last", default)]
442    #[serde_as(as = "Option<DisplayFromStr>")]
443    #[with_option(allow_alter_on_fly)]
444    pub snapshot_expiration_retain_last: Option<i32>,
445
446    #[serde(
447        rename = "snapshot_expiration_clear_expired_files",
448        default = "default_true",
449        deserialize_with = "deserialize_bool_from_string"
450    )]
451    #[with_option(allow_alter_on_fly)]
452    pub snapshot_expiration_clear_expired_files: bool,
453
454    #[serde(
455        rename = "snapshot_expiration_clear_expired_meta_data",
456        default = "default_true",
457        deserialize_with = "deserialize_bool_from_string"
458    )]
459    #[with_option(allow_alter_on_fly)]
460    pub snapshot_expiration_clear_expired_meta_data: bool,
461
462    /// The maximum number of snapshots allowed since the last rewrite operation
463    /// If set, sink will check snapshot count and wait if exceeded
464    #[serde(rename = "compaction.max_snapshots_num", default)]
465    #[serde_as(as = "Option<DisplayFromStr>")]
466    #[with_option(allow_alter_on_fly)]
467    pub max_snapshots_num_before_compaction: Option<usize>,
468
469    #[serde(rename = "compaction.small_files_threshold_mb", default)]
470    #[serde_as(as = "Option<DisplayFromStr>")]
471    #[with_option(allow_alter_on_fly)]
472    pub small_files_threshold_mb: Option<u64>,
473
474    #[serde(rename = "compaction.delete_files_count_threshold", default)]
475    #[serde_as(as = "Option<DisplayFromStr>")]
476    #[with_option(allow_alter_on_fly)]
477    pub delete_files_count_threshold: Option<usize>,
478
479    #[serde(rename = "compaction.trigger_snapshot_count", default)]
480    #[serde_as(as = "Option<DisplayFromStr>")]
481    #[with_option(allow_alter_on_fly)]
482    pub trigger_snapshot_count: Option<usize>,
483
484    #[serde(rename = "compaction.target_file_size_mb", default)]
485    #[serde_as(as = "Option<DisplayFromStr>")]
486    #[with_option(allow_alter_on_fly)]
487    pub target_file_size_mb: Option<u64>,
488
489    /// Compaction type: `full`, `small-files`, or `files-with-delete`
490    /// If not set, will default to `full`
491    #[serde(rename = "compaction.type", default)]
492    #[with_option(allow_alter_on_fly)]
493    pub compaction_type: Option<CompactionType>,
494
495    /// Parquet compression codec
496    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
497    /// Default is snappy
498    #[serde(rename = "compaction.write_parquet_compression", default)]
499    #[with_option(allow_alter_on_fly)]
500    pub write_parquet_compression: Option<String>,
501
502    /// Maximum number of rows in a Parquet row group
503    /// Default is 122880 (from developer config)
504    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
505    #[serde_as(as = "Option<DisplayFromStr>")]
506    #[with_option(allow_alter_on_fly)]
507    pub write_parquet_max_row_group_rows: Option<usize>,
508}
509
510impl EnforceSecret for IcebergConfig {
511    fn enforce_secret<'a>(
512        prop_iter: impl Iterator<Item = &'a str>,
513    ) -> crate::error::ConnectorResult<()> {
514        for prop in prop_iter {
515            IcebergCommon::enforce_one(prop)?;
516        }
517        Ok(())
518    }
519
520    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
521        IcebergCommon::enforce_one(prop)
522    }
523}
524
525impl IcebergConfig {
526    /// Validate that append-only sinks use merge-on-read mode
527    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
528    fn validate_append_only_write_mode(
529        sink_type: &str,
530        write_mode: IcebergWriteMode,
531    ) -> Result<()> {
532        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
533            return Err(SinkError::Config(anyhow!(
534                "'copy-on-write' mode is not supported for append-only iceberg sink. \
535                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
536            )));
537        }
538        Ok(())
539    }
540
541    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
542        let mut config =
543            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
544                .map_err(|e| SinkError::Config(anyhow!(e)))?;
545
546        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
547            return Err(SinkError::Config(anyhow!(
548                "`{}` must be {}, or {}",
549                SINK_TYPE_OPTION,
550                SINK_TYPE_APPEND_ONLY,
551                SINK_TYPE_UPSERT
552            )));
553        }
554
555        if config.r#type == SINK_TYPE_UPSERT {
556            if let Some(primary_key) = &config.primary_key {
557                if primary_key.is_empty() {
558                    return Err(SinkError::Config(anyhow!(
559                        "`primary-key` must not be empty in {}",
560                        SINK_TYPE_UPSERT
561                    )));
562                }
563            } else {
564                return Err(SinkError::Config(anyhow!(
565                    "Must set `primary-key` in {}",
566                    SINK_TYPE_UPSERT
567                )));
568            }
569        }
570
571        // Enforce merge-on-read for append-only sinks
572        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
573
574        // All configs start with "catalog." will be treated as java configs.
575        config.java_catalog_props = values
576            .iter()
577            .filter(|(k, _v)| {
578                k.starts_with("catalog.")
579                    && k != &"catalog.uri"
580                    && k != &"catalog.type"
581                    && k != &"catalog.name"
582                    && k != &"catalog.header"
583            })
584            .map(|(k, v)| (k[8..].to_string(), v.clone()))
585            .collect();
586
587        if config.commit_checkpoint_interval == 0 {
588            return Err(SinkError::Config(anyhow!(
589                "`commit-checkpoint-interval` must be greater than 0"
590            )));
591        }
592
593        Ok(config)
594    }
595
596    pub fn catalog_type(&self) -> &str {
597        self.common.catalog_type()
598    }
599
600    pub async fn load_table(&self) -> Result<Table> {
601        self.common
602            .load_table(&self.table, &self.java_catalog_props)
603            .await
604            .map_err(Into::into)
605    }
606
607    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
608        self.common
609            .create_catalog(&self.java_catalog_props)
610            .await
611            .map_err(Into::into)
612    }
613
614    pub fn full_table_name(&self) -> Result<TableIdent> {
615        self.table.to_table_ident().map_err(Into::into)
616    }
617
618    pub fn catalog_name(&self) -> String {
619        self.common.catalog_name()
620    }
621
622    pub fn table_format_version(&self) -> FormatVersion {
623        self.format_version
624    }
625
626    pub fn compaction_interval_sec(&self) -> u64 {
627        // default to 1 hour
628        self.compaction_interval_sec.unwrap_or(3600)
629    }
630
631    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
632    /// Returns `current_time_ms` - `max_age_millis`
633    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
634        self.snapshot_expiration_max_age_millis
635            .map(|max_age_millis| current_time_ms - max_age_millis)
636    }
637
638    pub fn trigger_snapshot_count(&self) -> usize {
639        self.trigger_snapshot_count.unwrap_or(usize::MAX)
640    }
641
642    pub fn small_files_threshold_mb(&self) -> u64 {
643        self.small_files_threshold_mb.unwrap_or(64)
644    }
645
646    pub fn delete_files_count_threshold(&self) -> usize {
647        self.delete_files_count_threshold.unwrap_or(256)
648    }
649
650    pub fn target_file_size_mb(&self) -> u64 {
651        self.target_file_size_mb.unwrap_or(1024)
652    }
653
654    /// Get the compaction type as an enum
655    /// This method parses the string and returns the enum value
656    pub fn compaction_type(&self) -> CompactionType {
657        self.compaction_type.unwrap_or_default()
658    }
659
660    /// Get the parquet compression codec
661    /// Default is "zstd"
662    pub fn write_parquet_compression(&self) -> &str {
663        self.write_parquet_compression.as_deref().unwrap_or("zstd")
664    }
665
666    /// Get the maximum number of rows in a Parquet row group
667    /// Default is 122880 (from developer config default)
668    pub fn write_parquet_max_row_group_rows(&self) -> usize {
669        self.write_parquet_max_row_group_rows.unwrap_or(122880)
670    }
671
672    /// Parse the compression codec string into Parquet Compression enum
673    /// Returns SNAPPY as default if parsing fails or not specified
674    pub fn get_parquet_compression(&self) -> Compression {
675        parse_parquet_compression(self.write_parquet_compression())
676    }
677}
678
679/// Parse compression codec string to Parquet Compression enum
680fn parse_parquet_compression(codec: &str) -> Compression {
681    match codec.to_lowercase().as_str() {
682        "uncompressed" => Compression::UNCOMPRESSED,
683        "snappy" => Compression::SNAPPY,
684        "gzip" => Compression::GZIP(Default::default()),
685        "lzo" => Compression::LZO,
686        "brotli" => Compression::BROTLI(Default::default()),
687        "lz4" => Compression::LZ4,
688        "zstd" => Compression::ZSTD(Default::default()),
689        _ => {
690            tracing::warn!(
691                "Unknown compression codec '{}', falling back to SNAPPY",
692                codec
693            );
694            Compression::SNAPPY
695        }
696    }
697}
698
699pub struct IcebergSink {
700    pub config: IcebergConfig,
701    param: SinkParam,
702    // In upsert mode, it never be None and empty.
703    unique_column_ids: Option<Vec<usize>>,
704}
705
706impl EnforceSecret for IcebergSink {
707    fn enforce_secret<'a>(
708        prop_iter: impl Iterator<Item = &'a str>,
709    ) -> crate::error::ConnectorResult<()> {
710        for prop in prop_iter {
711            IcebergConfig::enforce_one(prop)?;
712        }
713        Ok(())
714    }
715}
716
717impl TryFrom<SinkParam> for IcebergSink {
718    type Error = SinkError;
719
720    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
721        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
722        IcebergSink::new(config, param)
723    }
724}
725
726impl Debug for IcebergSink {
727    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728        f.debug_struct("IcebergSink")
729            .field("config", &self.config)
730            .finish()
731    }
732}
733
734async fn create_and_validate_table_impl(
735    config: &IcebergConfig,
736    param: &SinkParam,
737) -> Result<Table> {
738    if config.create_table_if_not_exists {
739        create_table_if_not_exists_impl(config, param).await?;
740    }
741
742    let table = config
743        .load_table()
744        .await
745        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
746
747    let sink_schema = param.schema();
748    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
749        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
750
751    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
752        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
753
754    Ok(table)
755}
756
757async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
758    let catalog = config.create_catalog().await?;
759    let namespace = if let Some(database_name) = config.table.database_name() {
760        let namespace = NamespaceIdent::new(database_name.to_owned());
761        if !catalog
762            .namespace_exists(&namespace)
763            .await
764            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
765        {
766            catalog
767                .create_namespace(&namespace, HashMap::default())
768                .await
769                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
770                .context("failed to create iceberg namespace")?;
771        }
772        namespace
773    } else {
774        bail!("database name must be set if you want to create table")
775    };
776
777    let table_id = config
778        .full_table_name()
779        .context("Unable to parse table name")?;
780    if !catalog
781        .table_exists(&table_id)
782        .await
783        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
784    {
785        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
786        // convert risingwave schema -> arrow schema -> iceberg schema
787        let arrow_fields = param
788            .columns
789            .iter()
790            .map(|column| {
791                Ok(iceberg_create_table_arrow_convert
792                    .to_arrow_field(&column.name, &column.data_type)
793                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794                    .context(format!(
795                        "failed to convert {}: {} to arrow type",
796                        &column.name, &column.data_type
797                    ))?)
798            })
799            .collect::<Result<Vec<ArrowField>>>()?;
800        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
801        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
802            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
803            .context("failed to convert arrow schema to iceberg schema")?;
804
805        let location = {
806            let mut names = namespace.clone().inner();
807            names.push(config.table.table_name().to_owned());
808            match &config.common.warehouse_path {
809                Some(warehouse_path) => {
810                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
811                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
812                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
813                    let url = Url::parse(warehouse_path);
814                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
815                        // For rest catalog, the warehouse_path could be a warehouse name.
816                        // In this case, we should specify the location when creating a table.
817                        if config.common.catalog_type() == "rest"
818                            || config.common.catalog_type() == "rest_rust"
819                        {
820                            None
821                        } else {
822                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
823                        }
824                    } else if warehouse_path.ends_with('/') {
825                        Some(format!("{}{}", warehouse_path, names.join("/")))
826                    } else {
827                        Some(format!("{}/{}", warehouse_path, names.join("/")))
828                    }
829                }
830                None => None,
831            }
832        };
833
834        let partition_spec = match &config.partition_by {
835            Some(partition_by) => {
836                let mut partition_fields = Vec::<UnboundPartitionField>::new();
837                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
838                    .into_iter()
839                    .enumerate()
840                {
841                    match iceberg_schema.field_id_by_name(&column) {
842                        Some(id) => partition_fields.push(
843                            UnboundPartitionField::builder()
844                                .source_id(id)
845                                .transform(transform)
846                                .name(format!("_p_{}", column))
847                                .field_id(PARTITION_DATA_ID_START + i as i32)
848                                .build(),
849                        ),
850                        None => bail!(format!(
851                            "Partition source column does not exist in schema: {}",
852                            column
853                        )),
854                    };
855                }
856                Some(
857                    UnboundPartitionSpec::builder()
858                        .with_spec_id(0)
859                        .add_partition_fields(partition_fields)
860                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
861                        .context("failed to add partition columns")?
862                        .build(),
863                )
864            }
865            None => None,
866        };
867
868        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
869        let properties = HashMap::from([(
870            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
871            (config.format_version as u8).to_string(),
872        )]);
873
874        let table_creation_builder = TableCreation::builder()
875            .name(config.table.table_name().to_owned())
876            .schema(iceberg_schema)
877            .format_version(config.table_format_version())
878            .properties(properties);
879
880        let table_creation = match (location, partition_spec) {
881            (Some(location), Some(partition_spec)) => table_creation_builder
882                .location(location)
883                .partition_spec(partition_spec)
884                .build(),
885            (Some(location), None) => table_creation_builder.location(location).build(),
886            (None, Some(partition_spec)) => table_creation_builder
887                .partition_spec(partition_spec)
888                .build(),
889            (None, None) => table_creation_builder.build(),
890        };
891
892        catalog
893            .create_table(&namespace, table_creation)
894            .await
895            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
896            .context("failed to create iceberg table")?;
897    }
898    Ok(())
899}
900
901impl IcebergSink {
902    pub async fn create_and_validate_table(&self) -> Result<Table> {
903        create_and_validate_table_impl(&self.config, &self.param).await
904    }
905
906    pub async fn create_table_if_not_exists(&self) -> Result<()> {
907        create_table_if_not_exists_impl(&self.config, &self.param).await
908    }
909
910    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
911        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
912            if let Some(pk) = &config.primary_key {
913                let mut unique_column_ids = Vec::with_capacity(pk.len());
914                for col_name in pk {
915                    let id = param
916                        .columns
917                        .iter()
918                        .find(|col| col.name.as_str() == col_name)
919                        .ok_or_else(|| {
920                            SinkError::Config(anyhow!(
921                                "Primary key column {} not found in sink schema",
922                                col_name
923                            ))
924                        })?
925                        .column_id
926                        .get_id() as usize;
927                    unique_column_ids.push(id);
928                }
929                Some(unique_column_ids)
930            } else {
931                unreachable!()
932            }
933        } else {
934            None
935        };
936        Ok(Self {
937            config,
938            param,
939            unique_column_ids,
940        })
941    }
942}
943
944impl Sink for IcebergSink {
945    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
946
947    const SINK_NAME: &'static str = ICEBERG_SINK;
948
949    async fn validate(&self) -> Result<()> {
950        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
951            bail!("Snowflake catalog only supports iceberg sources");
952        }
953
954        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
955            risingwave_common::license::Feature::IcebergSinkWithGlue
956                .check_available()
957                .map_err(|e| anyhow::anyhow!(e))?;
958        }
959
960        // Enforce merge-on-read for append-only tables
961        IcebergConfig::validate_append_only_write_mode(
962            &self.config.r#type,
963            self.config.write_mode,
964        )?;
965
966        // Validate compaction type configuration
967        let compaction_type = self.config.compaction_type();
968
969        // Check COW mode constraints
970        // COW mode only supports 'full' compaction type
971        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
972            && compaction_type != CompactionType::Full
973        {
974            bail!(
975                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
976                compaction_type
977            );
978        }
979
980        match compaction_type {
981            CompactionType::SmallFiles => {
982                // 1. check license
983                risingwave_common::license::Feature::IcebergCompaction
984                    .check_available()
985                    .map_err(|e| anyhow::anyhow!(e))?;
986
987                // 2. check write mode
988                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
989                    bail!(
990                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
991                        self.config.write_mode
992                    );
993                }
994
995                // 3. check conflicting parameters
996                if self.config.delete_files_count_threshold.is_some() {
997                    bail!(
998                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
999                    );
1000                }
1001            }
1002            CompactionType::FilesWithDelete => {
1003                // 1. check license
1004                risingwave_common::license::Feature::IcebergCompaction
1005                    .check_available()
1006                    .map_err(|e| anyhow::anyhow!(e))?;
1007
1008                // 2. check write mode
1009                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1010                    bail!(
1011                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1012                        self.config.write_mode
1013                    );
1014                }
1015
1016                // 3. check conflicting parameters
1017                if self.config.small_files_threshold_mb.is_some() {
1018                    bail!(
1019                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1020                    );
1021                }
1022            }
1023            CompactionType::Full => {
1024                // Full compaction has no special requirements
1025            }
1026        }
1027
1028        let _ = self.create_and_validate_table().await?;
1029        Ok(())
1030    }
1031
1032    fn support_schema_change() -> bool {
1033        true
1034    }
1035
1036    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1037        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1038
1039        // Validate compaction interval
1040        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1041            if iceberg_config.enable_compaction && compaction_interval == 0 {
1042                bail!(
1043                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1044                );
1045            }
1046
1047            // log compaction_interval
1048            tracing::info!(
1049                "Alter config compaction_interval set to {} seconds",
1050                compaction_interval
1051            );
1052        }
1053
1054        // Validate max snapshots
1055        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1056            && max_snapshots < 1
1057        {
1058            bail!(
1059                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1060                max_snapshots
1061            );
1062        }
1063
1064        // Validate target file size
1065        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1066            && target_file_size_mb == 0
1067        {
1068            bail!("`compaction.target_file_size_mb` must be greater than 0");
1069        }
1070
1071        // Validate parquet max row group rows
1072        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1073            && max_row_group_rows == 0
1074        {
1075            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1076        }
1077
1078        // Validate parquet compression codec
1079        if let Some(ref compression) = iceberg_config.write_parquet_compression {
1080            let valid_codecs = [
1081                "uncompressed",
1082                "snappy",
1083                "gzip",
1084                "lzo",
1085                "brotli",
1086                "lz4",
1087                "zstd",
1088            ];
1089            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1090                bail!(
1091                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1092                    valid_codecs,
1093                    compression
1094                );
1095            }
1096        }
1097
1098        Ok(())
1099    }
1100
1101    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1102        let writer = IcebergSinkWriter::new(
1103            self.config.clone(),
1104            self.param.clone(),
1105            writer_param.clone(),
1106            self.unique_column_ids.clone(),
1107        );
1108
1109        let commit_checkpoint_interval =
1110            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1111                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1112            );
1113        let log_sinker = CoordinatedLogSinker::new(
1114            &writer_param,
1115            self.param.clone(),
1116            writer,
1117            commit_checkpoint_interval,
1118        )
1119        .await?;
1120
1121        Ok(log_sinker)
1122    }
1123
1124    fn is_coordinated_sink(&self) -> bool {
1125        true
1126    }
1127
1128    async fn new_coordinator(
1129        &self,
1130        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1131    ) -> Result<SinkCommitCoordinator> {
1132        let catalog = self.config.create_catalog().await?;
1133        let table = self.create_and_validate_table().await?;
1134        let coordinator = IcebergSinkCommitter {
1135            catalog,
1136            table,
1137            last_commit_epoch: 0,
1138            sink_id: self.param.sink_id,
1139            config: self.config.clone(),
1140            param: self.param.clone(),
1141            commit_retry_num: self.config.commit_retry_num,
1142            iceberg_compact_stat_sender,
1143        };
1144        if self.config.is_exactly_once.unwrap_or_default() {
1145            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1146        } else {
1147            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1148        }
1149    }
1150}
1151
1152/// None means no project.
1153/// Prepare represent the extra partition column idx.
1154/// Done represents the project idx vec.
1155///
1156/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
1157/// to create the project idx vec.
1158enum ProjectIdxVec {
1159    None,
1160    Prepare(usize),
1161    Done(Vec<usize>),
1162}
1163
1164type DataFileWriterBuilderType =
1165    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1166type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1167    ParquetWriterBuilder,
1168    DefaultLocationGenerator,
1169    DefaultFileNameGenerator,
1170>;
1171type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1172    ParquetWriterBuilder,
1173    DefaultLocationGenerator,
1174    DefaultFileNameGenerator,
1175>;
1176type DeletionVectorWriterBuilderType =
1177    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1178type DeletionVectorWriterType =
1179    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1180type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1181    ParquetWriterBuilder,
1182    DefaultLocationGenerator,
1183    DefaultFileNameGenerator,
1184>;
1185
1186#[derive(Clone)]
1187enum PositionDeleteWriterBuilderType {
1188    PositionDelete(PositionDeleteFileWriterBuilderType),
1189    DeletionVector(DeletionVectorWriterBuilderType),
1190}
1191
1192enum PositionDeleteWriterType {
1193    PositionDelete(PositionDeleteFileWriterType),
1194    DeletionVector(DeletionVectorWriterType),
1195}
1196
1197#[async_trait]
1198impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1199    type R = PositionDeleteWriterType;
1200
1201    async fn build(
1202        &self,
1203        partition_key: Option<iceberg::spec::PartitionKey>,
1204    ) -> iceberg::Result<Self::R> {
1205        match self {
1206            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1207                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1208            ),
1209            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1210                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1211            ),
1212        }
1213    }
1214}
1215
1216#[async_trait]
1217impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1218    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1219        match self {
1220            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1221            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1222        }
1223    }
1224
1225    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1226        match self {
1227            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1228            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1229        }
1230    }
1231}
1232
1233#[derive(Clone)]
1234struct SharedIcebergWriterBuilder<B>(Arc<B>);
1235
1236#[async_trait]
1237impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1238    type R = B::R;
1239
1240    async fn build(
1241        &self,
1242        partition_key: Option<iceberg::spec::PartitionKey>,
1243    ) -> iceberg::Result<Self::R> {
1244        self.0.build(partition_key).await
1245    }
1246}
1247
1248#[derive(Clone)]
1249struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1250    inner: Arc<B>,
1251    fanout_enabled: bool,
1252    schema: IcebergSchemaRef,
1253    partition_spec: PartitionSpecRef,
1254    compute_partition: bool,
1255}
1256
1257impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1258    fn new(
1259        inner: B,
1260        fanout_enabled: bool,
1261        schema: IcebergSchemaRef,
1262        partition_spec: PartitionSpecRef,
1263        compute_partition: bool,
1264    ) -> Self {
1265        Self {
1266            inner: Arc::new(inner),
1267            fanout_enabled,
1268            schema,
1269            partition_spec,
1270            compute_partition,
1271        }
1272    }
1273
1274    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1275        let partition_splitter = match (
1276            self.partition_spec.is_unpartitioned(),
1277            self.compute_partition,
1278        ) {
1279            (true, _) => None,
1280            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1281                self.schema.clone(),
1282                self.partition_spec.clone(),
1283            )?),
1284            (false, false) => Some(
1285                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1286                    self.schema.clone(),
1287                    self.partition_spec.clone(),
1288                )?,
1289            ),
1290        };
1291
1292        Ok(TaskWriter::new_with_partition_splitter(
1293            SharedIcebergWriterBuilder(self.inner.clone()),
1294            self.fanout_enabled,
1295            self.schema.clone(),
1296            self.partition_spec.clone(),
1297            partition_splitter,
1298        ))
1299    }
1300}
1301
1302pub enum IcebergSinkWriter {
1303    Created(IcebergSinkWriterArgs),
1304    Initialized(IcebergSinkWriterInner),
1305}
1306
1307pub struct IcebergSinkWriterArgs {
1308    config: IcebergConfig,
1309    sink_param: SinkParam,
1310    writer_param: SinkWriterParam,
1311    unique_column_ids: Option<Vec<usize>>,
1312}
1313
1314pub struct IcebergSinkWriterInner {
1315    writer: IcebergWriterDispatch,
1316    arrow_schema: SchemaRef,
1317    // See comments below
1318    metrics: IcebergWriterMetrics,
1319    // State of iceberg table for this writer
1320    table: Table,
1321    // For chunk with extra partition column, we should remove this column before write.
1322    // This project index vec is used to avoid create project idx each time.
1323    project_idx_vec: ProjectIdxVec,
1324}
1325
1326#[allow(clippy::type_complexity)]
1327enum IcebergWriterDispatch {
1328    Append {
1329        writer: Option<Box<dyn IcebergWriter>>,
1330        writer_builder:
1331            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1332    },
1333    Upsert {
1334        writer: Option<Box<dyn IcebergWriter>>,
1335        writer_builder: TaskWriterBuilderWrapper<
1336            MonitoredGeneralWriterBuilder<
1337                DeltaWriterBuilder<
1338                    DataFileWriterBuilderType,
1339                    PositionDeleteWriterBuilderType,
1340                    EqualityDeleteFileWriterBuilderType,
1341                >,
1342            >,
1343        >,
1344        arrow_schema_with_op_column: SchemaRef,
1345    },
1346}
1347
1348impl IcebergWriterDispatch {
1349    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1350        match self {
1351            IcebergWriterDispatch::Append { writer, .. }
1352            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1353        }
1354    }
1355}
1356
1357pub struct IcebergWriterMetrics {
1358    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1359    // They are actually used in `PrometheusWriterBuilder`:
1360    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1361    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1362    _write_qps: LabelGuardedIntCounter,
1363    _write_latency: LabelGuardedHistogram,
1364    write_bytes: LabelGuardedIntCounter,
1365}
1366
1367impl IcebergSinkWriter {
1368    pub fn new(
1369        config: IcebergConfig,
1370        sink_param: SinkParam,
1371        writer_param: SinkWriterParam,
1372        unique_column_ids: Option<Vec<usize>>,
1373    ) -> Self {
1374        Self::Created(IcebergSinkWriterArgs {
1375            config,
1376            sink_param,
1377            writer_param,
1378            unique_column_ids,
1379        })
1380    }
1381}
1382
1383impl IcebergSinkWriterInner {
1384    fn build_append_only(
1385        config: &IcebergConfig,
1386        table: Table,
1387        writer_param: &SinkWriterParam,
1388    ) -> Result<Self> {
1389        let SinkWriterParam {
1390            extra_partition_col_idx,
1391            actor_id,
1392            sink_id,
1393            sink_name,
1394            ..
1395        } = writer_param;
1396        let metrics_labels = [
1397            &actor_id.to_string(),
1398            &sink_id.to_string(),
1399            sink_name.as_str(),
1400        ];
1401
1402        // Metrics
1403        let write_qps = GLOBAL_SINK_METRICS
1404            .iceberg_write_qps
1405            .with_guarded_label_values(&metrics_labels);
1406        let write_latency = GLOBAL_SINK_METRICS
1407            .iceberg_write_latency
1408            .with_guarded_label_values(&metrics_labels);
1409        // # TODO
1410        // Unused. Add this metrics later.
1411        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1412            .iceberg_rolling_unflushed_data_file
1413            .with_guarded_label_values(&metrics_labels);
1414        let write_bytes = GLOBAL_SINK_METRICS
1415            .iceberg_write_bytes
1416            .with_guarded_label_values(&metrics_labels);
1417
1418        let schema = table.metadata().current_schema();
1419        let partition_spec = table.metadata().default_partition_spec();
1420        let fanout_enabled = !partition_spec.fields().is_empty();
1421        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1422        let unique_uuid_suffix = Uuid::now_v7();
1423
1424        let parquet_writer_properties = WriterProperties::builder()
1425            .set_compression(config.get_parquet_compression())
1426            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1427            .set_created_by(PARQUET_CREATED_BY.to_owned())
1428            .build();
1429
1430        let parquet_writer_builder =
1431            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1432        let rolling_builder = RollingFileWriterBuilder::new(
1433            parquet_writer_builder,
1434            (config.target_file_size_mb() * 1024 * 1024) as usize,
1435            table.file_io().clone(),
1436            DefaultLocationGenerator::new(table.metadata().clone())
1437                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1438            DefaultFileNameGenerator::new(
1439                writer_param.actor_id.to_string(),
1440                Some(unique_uuid_suffix.to_string()),
1441                iceberg::spec::DataFileFormat::Parquet,
1442            ),
1443        );
1444        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1445        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1446            data_file_builder,
1447            write_qps.clone(),
1448            write_latency.clone(),
1449        );
1450        let writer_builder = TaskWriterBuilderWrapper::new(
1451            monitored_builder,
1452            fanout_enabled,
1453            schema.clone(),
1454            partition_spec.clone(),
1455            true,
1456        );
1457        let inner_writer = Some(Box::new(
1458            writer_builder
1459                .build()
1460                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1461        ) as Box<dyn IcebergWriter>);
1462        Ok(Self {
1463            arrow_schema: Arc::new(
1464                schema_to_arrow_schema(table.metadata().current_schema())
1465                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1466            ),
1467            metrics: IcebergWriterMetrics {
1468                _write_qps: write_qps,
1469                _write_latency: write_latency,
1470                write_bytes,
1471            },
1472            writer: IcebergWriterDispatch::Append {
1473                writer: inner_writer,
1474                writer_builder,
1475            },
1476            table,
1477            project_idx_vec: {
1478                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1479                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1480                } else {
1481                    ProjectIdxVec::None
1482                }
1483            },
1484        })
1485    }
1486
1487    fn build_upsert(
1488        config: &IcebergConfig,
1489        table: Table,
1490        unique_column_ids: Vec<usize>,
1491        writer_param: &SinkWriterParam,
1492    ) -> Result<Self> {
1493        let SinkWriterParam {
1494            extra_partition_col_idx,
1495            actor_id,
1496            sink_id,
1497            sink_name,
1498            ..
1499        } = writer_param;
1500        let metrics_labels = [
1501            &actor_id.to_string(),
1502            &sink_id.to_string(),
1503            sink_name.as_str(),
1504        ];
1505        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1506
1507        // Metrics
1508        let write_qps = GLOBAL_SINK_METRICS
1509            .iceberg_write_qps
1510            .with_guarded_label_values(&metrics_labels);
1511        let write_latency = GLOBAL_SINK_METRICS
1512            .iceberg_write_latency
1513            .with_guarded_label_values(&metrics_labels);
1514        // # TODO
1515        // Unused. Add this metrics later.
1516        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1517            .iceberg_rolling_unflushed_data_file
1518            .with_guarded_label_values(&metrics_labels);
1519        let write_bytes = GLOBAL_SINK_METRICS
1520            .iceberg_write_bytes
1521            .with_guarded_label_values(&metrics_labels);
1522
1523        // Determine the schema id and partition spec id
1524        let schema = table.metadata().current_schema();
1525        let partition_spec = table.metadata().default_partition_spec();
1526        let fanout_enabled = !partition_spec.fields().is_empty();
1527        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1528
1529        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1530        let unique_uuid_suffix = Uuid::now_v7();
1531
1532        let parquet_writer_properties = WriterProperties::builder()
1533            .set_compression(config.get_parquet_compression())
1534            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1535            .set_created_by(PARQUET_CREATED_BY.to_owned())
1536            .build();
1537
1538        let data_file_builder = {
1539            let parquet_writer_builder =
1540                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1541            let rolling_writer_builder = RollingFileWriterBuilder::new(
1542                parquet_writer_builder,
1543                (config.target_file_size_mb() * 1024 * 1024) as usize,
1544                table.file_io().clone(),
1545                DefaultLocationGenerator::new(table.metadata().clone())
1546                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1547                DefaultFileNameGenerator::new(
1548                    writer_param.actor_id.to_string(),
1549                    Some(unique_uuid_suffix.to_string()),
1550                    iceberg::spec::DataFileFormat::Parquet,
1551                ),
1552            );
1553            DataFileWriterBuilder::new(rolling_writer_builder)
1554        };
1555        let position_delete_builder = if use_deletion_vectors {
1556            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1557                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1558            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1559                table.file_io().clone(),
1560                location_generator,
1561                DefaultFileNameGenerator::new(
1562                    writer_param.actor_id.to_string(),
1563                    Some(format!("delvec-{}", unique_uuid_suffix)),
1564                    iceberg::spec::DataFileFormat::Puffin,
1565                ),
1566            ))
1567        } else {
1568            let parquet_writer_builder = ParquetWriterBuilder::new(
1569                parquet_writer_properties.clone(),
1570                POSITION_DELETE_SCHEMA.clone().into(),
1571            );
1572            let rolling_writer_builder = RollingFileWriterBuilder::new(
1573                parquet_writer_builder,
1574                (config.target_file_size_mb() * 1024 * 1024) as usize,
1575                table.file_io().clone(),
1576                DefaultLocationGenerator::new(table.metadata().clone())
1577                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1578                DefaultFileNameGenerator::new(
1579                    writer_param.actor_id.to_string(),
1580                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1581                    iceberg::spec::DataFileFormat::Parquet,
1582                ),
1583            );
1584            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1585                rolling_writer_builder,
1586            ))
1587        };
1588        let equality_delete_builder = {
1589            let eq_del_config = EqualityDeleteWriterConfig::new(
1590                unique_column_ids.clone(),
1591                table.metadata().current_schema().clone(),
1592            )
1593            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1594            let parquet_writer_builder = ParquetWriterBuilder::new(
1595                parquet_writer_properties,
1596                Arc::new(
1597                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1598                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1599                ),
1600            );
1601            let rolling_writer_builder = RollingFileWriterBuilder::new(
1602                parquet_writer_builder,
1603                (config.target_file_size_mb() * 1024 * 1024) as usize,
1604                table.file_io().clone(),
1605                DefaultLocationGenerator::new(table.metadata().clone())
1606                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1607                DefaultFileNameGenerator::new(
1608                    writer_param.actor_id.to_string(),
1609                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1610                    iceberg::spec::DataFileFormat::Parquet,
1611                ),
1612            );
1613
1614            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1615        };
1616        let delta_builder = DeltaWriterBuilder::new(
1617            data_file_builder,
1618            position_delete_builder,
1619            equality_delete_builder,
1620            unique_column_ids,
1621            schema.clone(),
1622        );
1623        let original_arrow_schema = Arc::new(
1624            schema_to_arrow_schema(table.metadata().current_schema())
1625                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1626        );
1627        let schema_with_extra_op_column = {
1628            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1629            new_fields.push(Arc::new(ArrowField::new(
1630                "op".to_owned(),
1631                ArrowDataType::Int32,
1632                false,
1633            )));
1634            Arc::new(ArrowSchema::new(new_fields))
1635        };
1636        let writer_builder = TaskWriterBuilderWrapper::new(
1637            MonitoredGeneralWriterBuilder::new(
1638                delta_builder,
1639                write_qps.clone(),
1640                write_latency.clone(),
1641            ),
1642            fanout_enabled,
1643            schema.clone(),
1644            partition_spec.clone(),
1645            true,
1646        );
1647        let inner_writer = Some(Box::new(
1648            writer_builder
1649                .build()
1650                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1651        ) as Box<dyn IcebergWriter>);
1652        Ok(Self {
1653            arrow_schema: original_arrow_schema,
1654            metrics: IcebergWriterMetrics {
1655                _write_qps: write_qps,
1656                _write_latency: write_latency,
1657                write_bytes,
1658            },
1659            table,
1660            writer: IcebergWriterDispatch::Upsert {
1661                writer: inner_writer,
1662                writer_builder,
1663                arrow_schema_with_op_column: schema_with_extra_op_column,
1664            },
1665            project_idx_vec: {
1666                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1667                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1668                } else {
1669                    ProjectIdxVec::None
1670                }
1671            },
1672        })
1673    }
1674}
1675
1676#[async_trait]
1677impl SinkWriter for IcebergSinkWriter {
1678    type CommitMetadata = Option<SinkMetadata>;
1679
1680    /// Begin a new epoch
1681    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1682        let Self::Created(args) = self else {
1683            return Ok(());
1684        };
1685
1686        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1687        let inner = match &args.unique_column_ids {
1688            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1689                &args.config,
1690                table,
1691                unique_column_ids.clone(),
1692                &args.writer_param,
1693            )?,
1694            None => {
1695                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1696            }
1697        };
1698
1699        *self = IcebergSinkWriter::Initialized(inner);
1700        Ok(())
1701    }
1702
1703    /// Write a stream chunk to sink
1704    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1705        let Self::Initialized(inner) = self else {
1706            unreachable!("IcebergSinkWriter should be initialized before barrier");
1707        };
1708
1709        // Try to build writer if it's None.
1710        match &mut inner.writer {
1711            IcebergWriterDispatch::Append {
1712                writer,
1713                writer_builder,
1714            } => {
1715                if writer.is_none() {
1716                    *writer = Some(Box::new(
1717                        writer_builder
1718                            .build()
1719                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1720                    ));
1721                }
1722            }
1723            IcebergWriterDispatch::Upsert {
1724                writer,
1725                writer_builder,
1726                ..
1727            } => {
1728                if writer.is_none() {
1729                    *writer = Some(Box::new(
1730                        writer_builder
1731                            .build()
1732                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1733                    ));
1734                }
1735            }
1736        };
1737
1738        // Process the chunk.
1739        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1740        match &mut inner.project_idx_vec {
1741            ProjectIdxVec::None => {}
1742            ProjectIdxVec::Prepare(idx) => {
1743                if *idx >= chunk.columns().len() {
1744                    return Err(SinkError::Iceberg(anyhow!(
1745                        "invalid extra partition column index {}",
1746                        idx
1747                    )));
1748                }
1749                let project_idx_vec = (0..*idx)
1750                    .chain(*idx + 1..chunk.columns().len())
1751                    .collect_vec();
1752                chunk = chunk.project(&project_idx_vec);
1753                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1754            }
1755            ProjectIdxVec::Done(idx_vec) => {
1756                chunk = chunk.project(idx_vec);
1757            }
1758        }
1759        if ops.is_empty() {
1760            return Ok(());
1761        }
1762        let write_batch_size = chunk.estimated_heap_size();
1763        let batch = match &inner.writer {
1764            IcebergWriterDispatch::Append { .. } => {
1765                // separate out insert chunk
1766                let filters =
1767                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1768                chunk.set_visibility(filters);
1769                IcebergArrowConvert
1770                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1771                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1772            }
1773            IcebergWriterDispatch::Upsert {
1774                arrow_schema_with_op_column,
1775                ..
1776            } => {
1777                let chunk = IcebergArrowConvert
1778                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1779                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1780                let ops = Arc::new(Int32Array::from(
1781                    ops.iter()
1782                        .map(|op| match op {
1783                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1784                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1785                        })
1786                        .collect_vec(),
1787                ));
1788                let mut columns = chunk.columns().to_vec();
1789                columns.push(ops);
1790                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1791                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1792            }
1793        };
1794
1795        let writer = inner.writer.get_writer().unwrap();
1796        writer
1797            .write(batch)
1798            .instrument_await("iceberg_write")
1799            .await
1800            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1801        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1802        Ok(())
1803    }
1804
1805    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1806    /// writer should commit the current epoch.
1807    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1808        let Self::Initialized(inner) = self else {
1809            unreachable!("IcebergSinkWriter should be initialized before barrier");
1810        };
1811
1812        // Skip it if not checkpoint
1813        if !is_checkpoint {
1814            return Ok(None);
1815        }
1816
1817        let close_result = match &mut inner.writer {
1818            IcebergWriterDispatch::Append {
1819                writer,
1820                writer_builder,
1821            } => {
1822                let close_result = match writer.take() {
1823                    Some(mut writer) => {
1824                        Some(writer.close().instrument_await("iceberg_close").await)
1825                    }
1826                    _ => None,
1827                };
1828                match writer_builder.build() {
1829                    Ok(new_writer) => {
1830                        *writer = Some(Box::new(new_writer));
1831                    }
1832                    _ => {
1833                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1834                        // here because current writer may close successfully. So we just log the error.
1835                        warn!("Failed to build new writer after close");
1836                    }
1837                }
1838                close_result
1839            }
1840            IcebergWriterDispatch::Upsert {
1841                writer,
1842                writer_builder,
1843                ..
1844            } => {
1845                let close_result = match writer.take() {
1846                    Some(mut writer) => {
1847                        Some(writer.close().instrument_await("iceberg_close").await)
1848                    }
1849                    _ => None,
1850                };
1851                match writer_builder.build() {
1852                    Ok(new_writer) => {
1853                        *writer = Some(Box::new(new_writer));
1854                    }
1855                    _ => {
1856                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1857                        // here because current writer may close successfully. So we just log the error.
1858                        warn!("Failed to build new writer after close");
1859                    }
1860                }
1861                close_result
1862            }
1863        };
1864
1865        match close_result {
1866            Some(Ok(result)) => {
1867                let format_version = inner.table.metadata().format_version();
1868                let partition_type = inner.table.metadata().default_partition_type();
1869                let data_files = result
1870                    .into_iter()
1871                    .map(|f| {
1872                        // Truncate large column statistics BEFORE serialization
1873                        let truncated = truncate_datafile(f);
1874                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1875                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1876                    })
1877                    .collect::<Result<Vec<_>>>()?;
1878                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1879                    data_files,
1880                    schema_id: inner.table.metadata().current_schema_id(),
1881                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1882                })?))
1883            }
1884            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1885            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1886        }
1887    }
1888}
1889
1890const SCHEMA_ID: &str = "schema_id";
1891const PARTITION_SPEC_ID: &str = "partition_spec_id";
1892const DATA_FILES: &str = "data_files";
1893
1894/// Maximum size for column statistics (min/max values) in bytes.
1895/// Column statistics larger than this will be truncated to avoid metadata bloat.
1896/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1897///
1898/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1899/// We truncate at the `DataFile` level (before serialization) by directly modifying
1900/// the public `lower_bounds` and `upper_bounds` fields.
1901///
1902/// This prevents metadata from ballooning to gigabytes when dealing with large
1903/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1904/// that benefit from query optimization.
1905const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1906
1907/// Truncate large column statistics from `DataFile` BEFORE serialization.
1908///
1909/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1910/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1911///
1912/// # Arguments
1913/// * `data_file` - A `DataFile` to process
1914///
1915/// # Returns
1916/// The modified `DataFile` with large statistics truncated
1917fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1918    // Process lower_bounds - remove entries with large values
1919    data_file.lower_bounds.retain(|field_id, datum| {
1920        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1921        let size = match datum.to_bytes() {
1922            Ok(bytes) => bytes.len(),
1923            Err(_) => 0,
1924        };
1925
1926        if size > MAX_COLUMN_STAT_SIZE {
1927            tracing::debug!(
1928                field_id = field_id,
1929                size = size,
1930                "Truncating large lower_bound statistic"
1931            );
1932            return false;
1933        }
1934        true
1935    });
1936
1937    // Process upper_bounds - remove entries with large values
1938    data_file.upper_bounds.retain(|field_id, datum| {
1939        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1940        let size = match datum.to_bytes() {
1941            Ok(bytes) => bytes.len(),
1942            Err(_) => 0,
1943        };
1944
1945        if size > MAX_COLUMN_STAT_SIZE {
1946            tracing::debug!(
1947                field_id = field_id,
1948                size = size,
1949                "Truncating large upper_bound statistic"
1950            );
1951            return false;
1952        }
1953        true
1954    });
1955
1956    data_file
1957}
1958
1959#[derive(Default, Clone)]
1960struct IcebergCommitResult {
1961    schema_id: i32,
1962    partition_spec_id: i32,
1963    data_files: Vec<SerializedDataFile>,
1964}
1965
1966impl IcebergCommitResult {
1967    fn try_from(value: &SinkMetadata) -> Result<Self> {
1968        if let Some(Serialized(v)) = &value.metadata {
1969            let mut values = if let serde_json::Value::Object(v) =
1970                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1971                    .context("Can't parse iceberg sink metadata")?
1972            {
1973                v
1974            } else {
1975                bail!("iceberg sink metadata should be an object");
1976            };
1977
1978            let schema_id;
1979            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1980                schema_id = value
1981                    .as_u64()
1982                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1983            } else {
1984                bail!("iceberg sink metadata should have schema_id");
1985            }
1986
1987            let partition_spec_id;
1988            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1989                partition_spec_id = value
1990                    .as_u64()
1991                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1992            } else {
1993                bail!("iceberg sink metadata should have partition_spec_id");
1994            }
1995
1996            let data_files: Vec<SerializedDataFile>;
1997            if let serde_json::Value::Array(values) = values
1998                .remove(DATA_FILES)
1999                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2000            {
2001                data_files = values
2002                    .into_iter()
2003                    .map(from_value::<SerializedDataFile>)
2004                    .collect::<std::result::Result<_, _>>()
2005                    .unwrap();
2006            } else {
2007                bail!("iceberg sink metadata should have data_files object");
2008            }
2009
2010            Ok(Self {
2011                schema_id: schema_id as i32,
2012                partition_spec_id: partition_spec_id as i32,
2013                data_files,
2014            })
2015        } else {
2016            bail!("Can't create iceberg sink write result from empty data!")
2017        }
2018    }
2019
2020    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2021        let mut values = if let serde_json::Value::Object(value) =
2022            serde_json::from_slice::<serde_json::Value>(&value)
2023                .context("Can't parse iceberg sink metadata")?
2024        {
2025            value
2026        } else {
2027            bail!("iceberg sink metadata should be an object");
2028        };
2029
2030        let schema_id;
2031        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2032            schema_id = value
2033                .as_u64()
2034                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2035        } else {
2036            bail!("iceberg sink metadata should have schema_id");
2037        }
2038
2039        let partition_spec_id;
2040        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2041            partition_spec_id = value
2042                .as_u64()
2043                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2044        } else {
2045            bail!("iceberg sink metadata should have partition_spec_id");
2046        }
2047
2048        let data_files: Vec<SerializedDataFile>;
2049        if let serde_json::Value::Array(values) = values
2050            .remove(DATA_FILES)
2051            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2052        {
2053            data_files = values
2054                .into_iter()
2055                .map(from_value::<SerializedDataFile>)
2056                .collect::<std::result::Result<_, _>>()
2057                .unwrap();
2058        } else {
2059            bail!("iceberg sink metadata should have data_files object");
2060        }
2061
2062        Ok(Self {
2063            schema_id: schema_id as i32,
2064            partition_spec_id: partition_spec_id as i32,
2065            data_files,
2066        })
2067    }
2068}
2069
2070impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2071    type Error = SinkError;
2072
2073    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2074        let json_data_files = serde_json::Value::Array(
2075            value
2076                .data_files
2077                .iter()
2078                .map(serde_json::to_value)
2079                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2080                .context("Can't serialize data files to json")?,
2081        );
2082        let json_value = serde_json::Value::Object(
2083            vec![
2084                (
2085                    SCHEMA_ID.to_owned(),
2086                    serde_json::Value::Number(value.schema_id.into()),
2087                ),
2088                (
2089                    PARTITION_SPEC_ID.to_owned(),
2090                    serde_json::Value::Number(value.partition_spec_id.into()),
2091                ),
2092                (DATA_FILES.to_owned(), json_data_files),
2093            ]
2094            .into_iter()
2095            .collect(),
2096        );
2097        Ok(SinkMetadata {
2098            metadata: Some(Serialized(SerializedMetadata {
2099                metadata: serde_json::to_vec(&json_value)
2100                    .context("Can't serialize iceberg sink metadata")?,
2101            })),
2102        })
2103    }
2104}
2105
2106impl TryFrom<IcebergCommitResult> for Vec<u8> {
2107    type Error = SinkError;
2108
2109    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2110        let json_data_files = serde_json::Value::Array(
2111            value
2112                .data_files
2113                .iter()
2114                .map(serde_json::to_value)
2115                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2116                .context("Can't serialize data files to json")?,
2117        );
2118        let json_value = serde_json::Value::Object(
2119            vec![
2120                (
2121                    SCHEMA_ID.to_owned(),
2122                    serde_json::Value::Number(value.schema_id.into()),
2123                ),
2124                (
2125                    PARTITION_SPEC_ID.to_owned(),
2126                    serde_json::Value::Number(value.partition_spec_id.into()),
2127                ),
2128                (DATA_FILES.to_owned(), json_data_files),
2129            ]
2130            .into_iter()
2131            .collect(),
2132        );
2133        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2134    }
2135}
2136pub struct IcebergSinkCommitter {
2137    catalog: Arc<dyn Catalog>,
2138    table: Table,
2139    pub last_commit_epoch: u64,
2140    pub(crate) sink_id: SinkId,
2141    pub(crate) config: IcebergConfig,
2142    pub(crate) param: SinkParam,
2143    commit_retry_num: u32,
2144    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2145}
2146
2147impl IcebergSinkCommitter {
2148    // Reload table and guarantee current schema_id and partition_spec_id matches
2149    // given `schema_id` and `partition_spec_id`
2150    async fn reload_table(
2151        catalog: &dyn Catalog,
2152        table_ident: &TableIdent,
2153        schema_id: i32,
2154        partition_spec_id: i32,
2155    ) -> Result<Table> {
2156        let table = catalog
2157            .load_table(table_ident)
2158            .await
2159            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2160        if table.metadata().current_schema_id() != schema_id {
2161            return Err(SinkError::Iceberg(anyhow!(
2162                "Schema evolution not supported, expect schema id {}, but got {}",
2163                schema_id,
2164                table.metadata().current_schema_id()
2165            )));
2166        }
2167        if table.metadata().default_partition_spec_id() != partition_spec_id {
2168            return Err(SinkError::Iceberg(anyhow!(
2169                "Partition evolution not supported, expect partition spec id {}, but got {}",
2170                partition_spec_id,
2171                table.metadata().default_partition_spec_id()
2172            )));
2173        }
2174        Ok(table)
2175    }
2176}
2177
2178#[async_trait]
2179impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2180    async fn init(&mut self) -> Result<()> {
2181        tracing::info!(
2182            sink_id = %self.param.sink_id,
2183            "Iceberg sink coordinator initialized",
2184        );
2185
2186        Ok(())
2187    }
2188
2189    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2190        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2191
2192        if metadata.is_empty() {
2193            tracing::debug!(?epoch, "No datafile to commit");
2194            return Ok(());
2195        }
2196
2197        // Commit data if present
2198        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2199            self.commit_data_impl(epoch, write_results, snapshot_id)
2200                .await?;
2201        }
2202
2203        Ok(())
2204    }
2205
2206    async fn commit_schema_change(
2207        &mut self,
2208        epoch: u64,
2209        schema_change: PbSinkSchemaChange,
2210    ) -> Result<()> {
2211        tracing::info!(
2212            "Committing schema change {:?} in epoch {}",
2213            schema_change,
2214            epoch
2215        );
2216        self.commit_schema_change_impl(schema_change).await?;
2217        tracing::info!("Successfully committed schema change in epoch {}", epoch);
2218
2219        Ok(())
2220    }
2221}
2222
2223#[async_trait]
2224impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2225    async fn init(&mut self) -> Result<()> {
2226        tracing::info!(
2227            sink_id = %self.param.sink_id,
2228            "Iceberg sink coordinator initialized",
2229        );
2230
2231        Ok(())
2232    }
2233
2234    async fn pre_commit(
2235        &mut self,
2236        epoch: u64,
2237        metadata: Vec<SinkMetadata>,
2238        _schema_change: Option<PbSinkSchemaChange>,
2239    ) -> Result<Option<Vec<u8>>> {
2240        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2241
2242        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2243            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2244            None => {
2245                tracing::debug!(?epoch, "no data to pre commit");
2246                return Ok(None);
2247            }
2248        };
2249
2250        let mut write_results_bytes = Vec::new();
2251        for each_parallelism_write_result in write_results {
2252            let each_parallelism_write_result_bytes: Vec<u8> =
2253                each_parallelism_write_result.try_into()?;
2254            write_results_bytes.push(each_parallelism_write_result_bytes);
2255        }
2256
2257        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2258        write_results_bytes.push(snapshot_id_bytes);
2259
2260        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2261        Ok(Some(pre_commit_metadata_bytes))
2262    }
2263
2264    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2265        tracing::debug!("Starting iceberg commit in epoch {epoch}");
2266
2267        if commit_metadata.is_empty() {
2268            tracing::debug!(?epoch, "No datafile to commit");
2269            return Ok(());
2270        }
2271
2272        // Deserialize commit metadata
2273        let mut payload = deserialize_metadata(commit_metadata);
2274        if payload.is_empty() {
2275            return Err(SinkError::Iceberg(anyhow!(
2276                "Invalid commit metadata: empty payload"
2277            )));
2278        }
2279
2280        // Last element is snapshot_id
2281        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2282            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2283        })?;
2284        let snapshot_id = i64::from_le_bytes(
2285            snapshot_id_bytes
2286                .try_into()
2287                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2288        );
2289
2290        // Remaining elements are write_results
2291        let write_results = payload
2292            .into_iter()
2293            .map(IcebergCommitResult::try_from_serialized_bytes)
2294            .collect::<Result<Vec<_>>>()?;
2295
2296        let snapshot_committed = self
2297            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2298            .await?;
2299
2300        if snapshot_committed {
2301            tracing::info!(
2302                "Snapshot id {} already committed in iceberg table, skip committing again.",
2303                snapshot_id
2304            );
2305            return Ok(());
2306        }
2307
2308        self.commit_data_impl(epoch, write_results, snapshot_id)
2309            .await
2310    }
2311
2312    async fn commit_schema_change(
2313        &mut self,
2314        epoch: u64,
2315        schema_change: PbSinkSchemaChange,
2316    ) -> Result<()> {
2317        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2318        if schema_updated {
2319            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2320            return Ok(());
2321        }
2322
2323        tracing::info!(
2324            "Committing schema change {:?} in epoch {}",
2325            schema_change,
2326            epoch
2327        );
2328        self.commit_schema_change_impl(schema_change).await?;
2329        tracing::info!("Successfully committed schema change in epoch {epoch}");
2330
2331        Ok(())
2332    }
2333
2334    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2335        // TODO: Files that have been written but not committed should be deleted.
2336        tracing::debug!("Abort not implemented yet");
2337    }
2338}
2339
2340/// Methods Required to Achieve Exactly Once Semantics
2341impl IcebergSinkCommitter {
2342    fn pre_commit_inner(
2343        &mut self,
2344        _epoch: u64,
2345        metadata: Vec<SinkMetadata>,
2346    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2347        let write_results: Vec<IcebergCommitResult> = metadata
2348            .iter()
2349            .map(IcebergCommitResult::try_from)
2350            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2351
2352        // Skip if no data to commit
2353        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2354            return Ok(None);
2355        }
2356
2357        let expect_schema_id = write_results[0].schema_id;
2358        let expect_partition_spec_id = write_results[0].partition_spec_id;
2359
2360        // guarantee that all write results has same schema_id and partition_spec_id
2361        if write_results
2362            .iter()
2363            .any(|r| r.schema_id != expect_schema_id)
2364            || write_results
2365                .iter()
2366                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2367        {
2368            return Err(SinkError::Iceberg(anyhow!(
2369                "schema_id and partition_spec_id should be the same in all write results"
2370            )));
2371        }
2372
2373        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2374
2375        Ok(Some((write_results, snapshot_id)))
2376    }
2377
2378    async fn commit_data_impl(
2379        &mut self,
2380        epoch: u64,
2381        write_results: Vec<IcebergCommitResult>,
2382        snapshot_id: i64,
2383    ) -> Result<()> {
2384        // Empty write results should be handled before calling this function.
2385        assert!(
2386            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2387        );
2388
2389        // Check snapshot limit before proceeding with commit
2390        self.wait_for_snapshot_limit().await?;
2391
2392        let expect_schema_id = write_results[0].schema_id;
2393        let expect_partition_spec_id = write_results[0].partition_spec_id;
2394
2395        // Load the latest table to avoid concurrent modification with the best effort.
2396        self.table = Self::reload_table(
2397            self.catalog.as_ref(),
2398            self.table.identifier(),
2399            expect_schema_id,
2400            expect_partition_spec_id,
2401        )
2402        .await?;
2403
2404        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2405            return Err(SinkError::Iceberg(anyhow!(
2406                "Can't find schema by id {}",
2407                expect_schema_id
2408            )));
2409        };
2410        let Some(partition_spec) = self
2411            .table
2412            .metadata()
2413            .partition_spec_by_id(expect_partition_spec_id)
2414        else {
2415            return Err(SinkError::Iceberg(anyhow!(
2416                "Can't find partition spec by id {}",
2417                expect_partition_spec_id
2418            )));
2419        };
2420        let partition_type = partition_spec
2421            .as_ref()
2422            .clone()
2423            .partition_type(schema)
2424            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2425
2426        let data_files = write_results
2427            .into_iter()
2428            .flat_map(|r| {
2429                r.data_files.into_iter().map(|f| {
2430                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2431                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2432                })
2433            })
2434            .collect::<Result<Vec<DataFile>>>()?;
2435        // # TODO:
2436        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2437        // because retry logic involved reapply the commit metadata.
2438        // For now, we just retry the commit operation.
2439        let retry_strategy = ExponentialBackoff::from_millis(10)
2440            .max_delay(Duration::from_secs(60))
2441            .map(jitter)
2442            .take(self.commit_retry_num as usize);
2443        let catalog = self.catalog.clone();
2444        let table_ident = self.table.identifier().clone();
2445
2446        // Custom retry logic that:
2447        // 1. Calls reload_table before each commit attempt to get the latest metadata
2448        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2449        // 3. If commit fails, retries with backoff
2450        enum CommitError {
2451            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2452            Commit(SinkError),      // Retriable: commit conflicts, network errors
2453        }
2454
2455        let table = RetryIf::spawn(
2456            retry_strategy,
2457            || async {
2458                // Reload table before each commit attempt to get the latest metadata
2459                let table = Self::reload_table(
2460                    catalog.as_ref(),
2461                    &table_ident,
2462                    expect_schema_id,
2463                    expect_partition_spec_id,
2464                )
2465                .await
2466                .map_err(|e| {
2467                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2468                    CommitError::ReloadTable(e)
2469                })?;
2470
2471                let txn = Transaction::new(&table);
2472                let append_action = txn
2473                    .fast_append()
2474                    .set_snapshot_id(snapshot_id)
2475                    .set_target_branch(commit_branch(
2476                        self.config.r#type.as_str(),
2477                        self.config.write_mode,
2478                    ))
2479                    .add_data_files(data_files.clone());
2480
2481                let tx = append_action.apply(txn).map_err(|err| {
2482                    let err: IcebergError = err.into();
2483                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2484                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2485                })?;
2486
2487                tx.commit(catalog.as_ref()).await.map_err(|err| {
2488                    let err: IcebergError = err.into();
2489                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2490                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2491                })
2492            },
2493            |err: &CommitError| {
2494                // Only retry on commit errors, not on reload_table errors
2495                match err {
2496                    CommitError::Commit(_) => {
2497                        tracing::warn!("Commit failed, will retry");
2498                        true
2499                    }
2500                    CommitError::ReloadTable(_) => {
2501                        tracing::error!(
2502                            "reload_table failed with non-retriable error, will not retry"
2503                        );
2504                        false
2505                    }
2506                }
2507            },
2508        )
2509        .await
2510        .map_err(|e| match e {
2511            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2512        })?;
2513        self.table = table;
2514
2515        let snapshot_num = self.table.metadata().snapshots().count();
2516        let catalog_name = self.config.common.catalog_name();
2517        let table_name = self.table.identifier().to_string();
2518        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2519        GLOBAL_SINK_METRICS
2520            .iceberg_snapshot_num
2521            .with_guarded_label_values(&metrics_labels)
2522            .set(snapshot_num as i64);
2523
2524        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2525
2526        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2527            && self.config.enable_compaction
2528            && iceberg_compact_stat_sender
2529                .send(IcebergSinkCompactionUpdate {
2530                    sink_id: self.sink_id,
2531                    compaction_interval: self.config.compaction_interval_sec(),
2532                    force_compaction: false,
2533                })
2534                .is_err()
2535        {
2536            warn!("failed to send iceberg compaction stats");
2537        }
2538
2539        Ok(())
2540    }
2541
2542    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2543    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2544    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2545    async fn is_snapshot_id_in_iceberg(
2546        &self,
2547        iceberg_config: &IcebergConfig,
2548        snapshot_id: i64,
2549    ) -> Result<bool> {
2550        let table = iceberg_config.load_table().await?;
2551        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2552            Ok(true)
2553        } else {
2554            Ok(false)
2555        }
2556    }
2557
2558    /// Check if the specified columns already exist in the iceberg table's current schema.
2559    /// This is used to determine if schema change has already been applied.
2560    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2561        let current_schema = self.table.metadata().current_schema();
2562        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2563            .context("Failed to convert schema")
2564            .map_err(SinkError::Iceberg)?;
2565
2566        let iceberg_arrow_convert = IcebergArrowConvert;
2567
2568        let schema_matches = |expected: &[ArrowField]| {
2569            if current_arrow_schema.fields().len() != expected.len() {
2570                return false;
2571            }
2572
2573            expected.iter().all(|expected_field| {
2574                current_arrow_schema.fields().iter().any(|current_field| {
2575                    current_field.name() == expected_field.name()
2576                        && current_field.data_type() == expected_field.data_type()
2577                })
2578            })
2579        };
2580
2581        let original_arrow_fields: Vec<ArrowField> = schema_change
2582            .original_schema
2583            .iter()
2584            .map(|pb_field| {
2585                let field = Field::from(pb_field);
2586                iceberg_arrow_convert
2587                    .to_arrow_field(&field.name, &field.data_type)
2588                    .context("Failed to convert field to arrow")
2589                    .map_err(SinkError::Iceberg)
2590            })
2591            .collect::<Result<_>>()?;
2592
2593        // If current schema equals original_schema, then schema change is NOT applied.
2594        if schema_matches(&original_arrow_fields) {
2595            tracing::debug!(
2596                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2597                original_arrow_fields.len()
2598            );
2599            return Ok(false);
2600        }
2601
2602        // We only support add_columns for now.
2603        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2604            schema_change.op.as_ref()
2605        else {
2606            return Err(SinkError::Iceberg(anyhow!(
2607                "Unsupported sink schema change op in iceberg sink: {:?}",
2608                schema_change.op
2609            )));
2610        };
2611
2612        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2613            .fields
2614            .iter()
2615            .map(|pb_field| {
2616                let field = Field::from(pb_field);
2617                iceberg_arrow_convert
2618                    .to_arrow_field(&field.name, &field.data_type)
2619                    .context("Failed to convert field to arrow")
2620                    .map_err(SinkError::Iceberg)
2621            })
2622            .collect::<Result<_>>()?;
2623
2624        let mut expected_after_change = original_arrow_fields;
2625        expected_after_change.extend(add_arrow_fields);
2626
2627        // If current schema equals original_schema + add_columns, then schema change is applied.
2628        if schema_matches(&expected_after_change) {
2629            tracing::debug!(
2630                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2631                expected_after_change.len()
2632            );
2633            return Ok(true);
2634        }
2635
2636        Err(SinkError::Iceberg(anyhow!(
2637            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2638            schema_change.original_schema.len()
2639        )))
2640    }
2641
2642    /// Commit schema changes (e.g., add columns) to the iceberg table.
2643    /// This function uses Transaction API to atomically update the table schema
2644    /// with optimistic locking to prevent concurrent conflicts.
2645    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2646        use iceberg::spec::NestedField;
2647
2648        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2649            schema_change.op.as_ref()
2650        else {
2651            return Err(SinkError::Iceberg(anyhow!(
2652                "Unsupported sink schema change op in iceberg sink: {:?}",
2653                schema_change.op
2654            )));
2655        };
2656
2657        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2658
2659        // Step 1: Get current table metadata
2660        let metadata = self.table.metadata();
2661        let mut next_field_id = metadata.last_column_id() + 1;
2662        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2663
2664        // Step 2: Build new fields to add
2665        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2666        let mut new_fields = Vec::new();
2667
2668        for field in &add_columns {
2669            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2670            let arrow_field = iceberg_create_table_arrow_convert
2671                .to_arrow_field(&field.name, &field.data_type)
2672                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2673                .map_err(SinkError::Iceberg)?;
2674
2675            // Convert Arrow DataType to Iceberg Type
2676            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2677                .map_err(|err| {
2678                    SinkError::Iceberg(
2679                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2680                    )
2681                })?;
2682
2683            // Create NestedField with the next available field ID
2684            let nested_field = Arc::new(NestedField::optional(
2685                next_field_id,
2686                &field.name,
2687                iceberg_type,
2688            ));
2689
2690            new_fields.push(nested_field);
2691            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2692            next_field_id += 1;
2693        }
2694
2695        // Step 3: Create Transaction with UpdateSchemaAction
2696        tracing::info!(
2697            "Committing schema change to catalog for table {}",
2698            self.table.identifier()
2699        );
2700
2701        let txn = Transaction::new(&self.table);
2702        let action = txn.update_schema().add_fields(new_fields);
2703
2704        let updated_table = action
2705            .apply(txn)
2706            .context("Failed to apply schema update action")
2707            .map_err(SinkError::Iceberg)?
2708            .commit(self.catalog.as_ref())
2709            .await
2710            .context("Failed to commit table schema change")
2711            .map_err(SinkError::Iceberg)?;
2712
2713        self.table = updated_table;
2714
2715        tracing::info!(
2716            "Successfully committed schema change, added {} columns to iceberg table",
2717            add_columns.len()
2718        );
2719
2720        Ok(())
2721    }
2722
2723    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2724    /// Returns the number of snapshots since the last rewrite/overwrite
2725    fn count_snapshots_since_rewrite(&self) -> usize {
2726        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2727        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2728
2729        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2730        let mut count = 0;
2731        for snapshot in snapshots {
2732            // Check if this snapshot represents a rewrite or overwrite operation
2733            let summary = snapshot.summary();
2734            match &summary.operation {
2735                Operation::Replace => {
2736                    // Found a rewrite/overwrite operation, stop counting
2737                    break;
2738                }
2739
2740                _ => {
2741                    // Increment count for each snapshot that is not a rewrite/overwrite
2742                    count += 1;
2743                }
2744            }
2745        }
2746
2747        count
2748    }
2749
2750    /// Wait until snapshot count since last rewrite is below the limit
2751    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2752        if !self.config.enable_compaction {
2753            return Ok(());
2754        }
2755
2756        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2757            loop {
2758                let current_count = self.count_snapshots_since_rewrite();
2759
2760                if current_count < max_snapshots {
2761                    tracing::info!(
2762                        "Snapshot count check passed: {} < {}",
2763                        current_count,
2764                        max_snapshots
2765                    );
2766                    break;
2767                }
2768
2769                tracing::info!(
2770                    "Snapshot count {} exceeds limit {}, waiting...",
2771                    current_count,
2772                    max_snapshots
2773                );
2774
2775                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2776                    && iceberg_compact_stat_sender
2777                        .send(IcebergSinkCompactionUpdate {
2778                            sink_id: self.sink_id,
2779                            compaction_interval: self.config.compaction_interval_sec(),
2780                            force_compaction: true,
2781                        })
2782                        .is_err()
2783                {
2784                    tracing::warn!("failed to send iceberg compaction stats");
2785                }
2786
2787                // Wait for 30 seconds before checking again
2788                tokio::time::sleep(Duration::from_secs(30)).await;
2789
2790                // Reload table to get latest snapshots
2791                self.table = self.config.load_table().await?;
2792            }
2793        }
2794        Ok(())
2795    }
2796}
2797
2798const MAP_KEY: &str = "key";
2799const MAP_VALUE: &str = "value";
2800
2801fn get_fields<'a>(
2802    our_field_type: &'a risingwave_common::types::DataType,
2803    data_type: &ArrowDataType,
2804    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2805) -> Option<ArrowFields> {
2806    match data_type {
2807        ArrowDataType::Struct(fields) => {
2808            match our_field_type {
2809                risingwave_common::types::DataType::Struct(struct_fields) => {
2810                    struct_fields.iter().for_each(|(name, data_type)| {
2811                        let res = schema_fields.insert(name, data_type);
2812                        // This assert is to make sure there is no duplicate field name in the schema.
2813                        assert!(res.is_none())
2814                    });
2815                }
2816                risingwave_common::types::DataType::Map(map_fields) => {
2817                    schema_fields.insert(MAP_KEY, map_fields.key());
2818                    schema_fields.insert(MAP_VALUE, map_fields.value());
2819                }
2820                risingwave_common::types::DataType::List(list) => {
2821                    list.elem()
2822                        .as_struct()
2823                        .iter()
2824                        .for_each(|(name, data_type)| {
2825                            let res = schema_fields.insert(name, data_type);
2826                            // This assert is to make sure there is no duplicate field name in the schema.
2827                            assert!(res.is_none())
2828                        });
2829                }
2830                _ => {}
2831            };
2832            Some(fields.clone())
2833        }
2834        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2835            get_fields(our_field_type, field.data_type(), schema_fields)
2836        }
2837        _ => None, // not a supported complex type and unlikely to show up
2838    }
2839}
2840
2841fn check_compatibility(
2842    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2843    fields: &ArrowFields,
2844) -> anyhow::Result<bool> {
2845    for arrow_field in fields {
2846        let our_field_type = schema_fields
2847            .get(arrow_field.name().as_str())
2848            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2849
2850        // Iceberg source should be able to read iceberg decimal type.
2851        let converted_arrow_data_type = IcebergArrowConvert
2852            .to_arrow_field("", our_field_type)
2853            .map_err(|e| anyhow!(e))?
2854            .data_type()
2855            .clone();
2856
2857        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2858            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2859            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2860            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2861            (ArrowDataType::List(_), ArrowDataType::List(field))
2862            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2863                let mut schema_fields = HashMap::new();
2864                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2865                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2866            }
2867            // validate nested structs
2868            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2869                let mut schema_fields = HashMap::new();
2870                our_field_type
2871                    .as_struct()
2872                    .iter()
2873                    .for_each(|(name, data_type)| {
2874                        let res = schema_fields.insert(name, data_type);
2875                        // This assert is to make sure there is no duplicate field name in the schema.
2876                        assert!(res.is_none())
2877                    });
2878                check_compatibility(schema_fields, fields)?
2879            }
2880            // cases where left != right (metadata, field name mismatch)
2881            //
2882            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2883            // {"PARQUET:field_id": ".."}
2884            //
2885            // map: The standard name in arrow is "entries", "key", "value".
2886            // in iceberg-rs, it's called "key_value"
2887            (left, right) => left.equals_datatype(right),
2888        };
2889        if !compatible {
2890            bail!(
2891                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2892                arrow_field.name(),
2893                converted_arrow_data_type,
2894                arrow_field.data_type()
2895            );
2896        }
2897    }
2898    Ok(true)
2899}
2900
2901/// Try to match our schema with iceberg schema.
2902pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2903    if rw_schema.fields.len() != arrow_schema.fields().len() {
2904        bail!(
2905            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2906            rw_schema.fields.len(),
2907            arrow_schema.fields.len()
2908        );
2909    }
2910
2911    let mut schema_fields = HashMap::new();
2912    rw_schema.fields.iter().for_each(|field| {
2913        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2914        // This assert is to make sure there is no duplicate field name in the schema.
2915        assert!(res.is_none())
2916    });
2917
2918    check_compatibility(schema_fields, &arrow_schema.fields)?;
2919    Ok(())
2920}
2921
2922fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2923    serde_json::to_vec(&metadata).unwrap()
2924}
2925
2926fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2927    serde_json::from_slice(&bytes).unwrap()
2928}
2929
2930pub fn parse_partition_by_exprs(
2931    expr: String,
2932) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2933    // captures column, transform(column), transform(n,column), transform(n, column)
2934    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2935    if !re.is_match(&expr) {
2936        bail!(format!(
2937            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2938            expr
2939        ))
2940    }
2941    let caps = re.captures_iter(&expr);
2942
2943    let mut partition_columns = vec![];
2944
2945    for mat in caps {
2946        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2947            (&mat["transform"], Transform::Identity)
2948        } else {
2949            let mut func = mat["transform"].to_owned();
2950            if func == "bucket" || func == "truncate" {
2951                let n = &mat
2952                    .name("n")
2953                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2954                    .as_str();
2955                func = format!("{func}[{n}]");
2956            }
2957            (
2958                &mat["field"],
2959                Transform::from_str(&func)
2960                    .with_context(|| format!("invalid transform function {}", func))?,
2961            )
2962        };
2963        partition_columns.push((column.to_owned(), transform));
2964    }
2965    Ok(partition_columns)
2966}
2967
2968pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2969    if should_enable_iceberg_cow(sink_type, write_mode) {
2970        ICEBERG_COW_BRANCH.to_owned()
2971    } else {
2972        MAIN_BRANCH.to_owned()
2973    }
2974}
2975
2976pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2977    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2978}
2979
2980impl crate::with_options::WithOptions for IcebergWriteMode {}
2981
2982impl crate::with_options::WithOptions for FormatVersion {}
2983
2984impl crate::with_options::WithOptions for CompactionType {}
2985
2986#[cfg(test)]
2987mod test {
2988    use std::collections::BTreeMap;
2989
2990    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2991    use risingwave_common::types::{DataType, MapType, StructType};
2992
2993    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2994    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2995    use crate::sink::iceberg::{
2996        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2997        ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2998        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2999        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3000    };
3001
3002    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
3003
3004    #[test]
3005    fn test_compatible_arrow_schema() {
3006        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3007
3008        use super::*;
3009        let risingwave_schema = Schema::new(vec![
3010            Field::with_name(DataType::Int32, "a"),
3011            Field::with_name(DataType::Int32, "b"),
3012            Field::with_name(DataType::Int32, "c"),
3013        ]);
3014        let arrow_schema = ArrowSchema::new(vec![
3015            ArrowField::new("a", ArrowDataType::Int32, false),
3016            ArrowField::new("b", ArrowDataType::Int32, false),
3017            ArrowField::new("c", ArrowDataType::Int32, false),
3018        ]);
3019
3020        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3021
3022        let risingwave_schema = Schema::new(vec![
3023            Field::with_name(DataType::Int32, "d"),
3024            Field::with_name(DataType::Int32, "c"),
3025            Field::with_name(DataType::Int32, "a"),
3026            Field::with_name(DataType::Int32, "b"),
3027        ]);
3028        let arrow_schema = ArrowSchema::new(vec![
3029            ArrowField::new("a", ArrowDataType::Int32, false),
3030            ArrowField::new("b", ArrowDataType::Int32, false),
3031            ArrowField::new("d", ArrowDataType::Int32, false),
3032            ArrowField::new("c", ArrowDataType::Int32, false),
3033        ]);
3034        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3035
3036        let risingwave_schema = Schema::new(vec![
3037            Field::with_name(
3038                DataType::Struct(StructType::new(vec![
3039                    ("a1", DataType::Int32),
3040                    (
3041                        "a2",
3042                        DataType::Struct(StructType::new(vec![
3043                            ("a21", DataType::Bytea),
3044                            (
3045                                "a22",
3046                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3047                            ),
3048                        ])),
3049                    ),
3050                ])),
3051                "a",
3052            ),
3053            Field::with_name(
3054                DataType::list(DataType::Struct(StructType::new(vec![
3055                    ("b1", DataType::Int32),
3056                    ("b2", DataType::Bytea),
3057                    (
3058                        "b3",
3059                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3060                    ),
3061                ]))),
3062                "b",
3063            ),
3064            Field::with_name(
3065                DataType::Map(MapType::from_kv(
3066                    DataType::Varchar,
3067                    DataType::list(DataType::Struct(StructType::new([
3068                        ("c1", DataType::Int32),
3069                        ("c2", DataType::Bytea),
3070                        (
3071                            "c3",
3072                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3073                        ),
3074                    ]))),
3075                )),
3076                "c",
3077            ),
3078        ]);
3079        let arrow_schema = ArrowSchema::new(vec![
3080            ArrowField::new(
3081                "a",
3082                ArrowDataType::Struct(ArrowFields::from(vec![
3083                    ArrowField::new("a1", ArrowDataType::Int32, false),
3084                    ArrowField::new(
3085                        "a2",
3086                        ArrowDataType::Struct(ArrowFields::from(vec![
3087                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3088                            ArrowField::new_map(
3089                                "a22",
3090                                "entries",
3091                                ArrowFieldRef::new(ArrowField::new(
3092                                    "key",
3093                                    ArrowDataType::Utf8,
3094                                    false,
3095                                )),
3096                                ArrowFieldRef::new(ArrowField::new(
3097                                    "value",
3098                                    ArrowDataType::Utf8,
3099                                    false,
3100                                )),
3101                                false,
3102                                false,
3103                            ),
3104                        ])),
3105                        false,
3106                    ),
3107                ])),
3108                false,
3109            ),
3110            ArrowField::new(
3111                "b",
3112                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3113                    ArrowDataType::Struct(ArrowFields::from(vec![
3114                        ArrowField::new("b1", ArrowDataType::Int32, false),
3115                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3116                        ArrowField::new_map(
3117                            "b3",
3118                            "entries",
3119                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3120                            ArrowFieldRef::new(ArrowField::new(
3121                                "value",
3122                                ArrowDataType::Utf8,
3123                                false,
3124                            )),
3125                            false,
3126                            false,
3127                        ),
3128                    ])),
3129                    false,
3130                ))),
3131                false,
3132            ),
3133            ArrowField::new_map(
3134                "c",
3135                "entries",
3136                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3137                ArrowFieldRef::new(ArrowField::new(
3138                    "value",
3139                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3140                        ArrowDataType::Struct(ArrowFields::from(vec![
3141                            ArrowField::new("c1", ArrowDataType::Int32, false),
3142                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3143                            ArrowField::new_map(
3144                                "c3",
3145                                "entries",
3146                                ArrowFieldRef::new(ArrowField::new(
3147                                    "key",
3148                                    ArrowDataType::Utf8,
3149                                    false,
3150                                )),
3151                                ArrowFieldRef::new(ArrowField::new(
3152                                    "value",
3153                                    ArrowDataType::Utf8,
3154                                    false,
3155                                )),
3156                                false,
3157                                false,
3158                            ),
3159                        ])),
3160                        false,
3161                    ))),
3162                    false,
3163                )),
3164                false,
3165                false,
3166            ),
3167        ]);
3168        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3169    }
3170
3171    #[test]
3172    fn test_parse_iceberg_config() {
3173        let values = [
3174            ("connector", "iceberg"),
3175            ("type", "upsert"),
3176            ("primary_key", "v1"),
3177            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3178            ("warehouse.path", "s3://iceberg"),
3179            ("s3.endpoint", "http://127.0.0.1:9301"),
3180            ("s3.access.key", "hummockadmin"),
3181            ("s3.secret.key", "hummockadmin"),
3182            ("s3.path.style.access", "true"),
3183            ("s3.region", "us-east-1"),
3184            ("catalog.type", "jdbc"),
3185            ("catalog.name", "demo"),
3186            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3187            ("catalog.jdbc.user", "admin"),
3188            ("catalog.jdbc.password", "123456"),
3189            ("database.name", "demo_db"),
3190            ("table.name", "demo_table"),
3191            ("enable_compaction", "true"),
3192            ("compaction_interval_sec", "1800"),
3193            ("enable_snapshot_expiration", "true"),
3194        ]
3195        .into_iter()
3196        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3197        .collect();
3198
3199        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3200
3201        let expected_iceberg_config = IcebergConfig {
3202            common: IcebergCommon {
3203                warehouse_path: Some("s3://iceberg".to_owned()),
3204                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3205                s3_region: Some("us-east-1".to_owned()),
3206                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3207                s3_access_key: Some("hummockadmin".to_owned()),
3208                s3_secret_key: Some("hummockadmin".to_owned()),
3209                s3_iam_role_arn: None,
3210                gcs_credential: None,
3211                catalog_type: Some("jdbc".to_owned()),
3212                glue_id: None,
3213                glue_region: None,
3214                glue_access_key: None,
3215                glue_secret_key: None,
3216                glue_iam_role_arn: None,
3217                catalog_name: Some("demo".to_owned()),
3218                s3_path_style_access: Some(true),
3219                catalog_credential: None,
3220                catalog_oauth2_server_uri: None,
3221                catalog_scope: None,
3222                catalog_token: None,
3223                enable_config_load: None,
3224                rest_signing_name: None,
3225                rest_signing_region: None,
3226                rest_sigv4_enabled: None,
3227                hosted_catalog: None,
3228                azblob_account_name: None,
3229                azblob_account_key: None,
3230                azblob_endpoint_url: None,
3231                catalog_header: None,
3232                adlsgen2_account_name: None,
3233                adlsgen2_account_key: None,
3234                adlsgen2_endpoint: None,
3235                vended_credentials: None,
3236                catalog_security: None,
3237                gcp_auth_scopes: None,
3238                catalog_io_impl: None,
3239            },
3240            table: IcebergTableIdentifier {
3241                database_name: Some("demo_db".to_owned()),
3242                table_name: "demo_table".to_owned(),
3243            },
3244            r#type: "upsert".to_owned(),
3245            force_append_only: false,
3246            primary_key: Some(vec!["v1".to_owned()]),
3247            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3248            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3249                .into_iter()
3250                .map(|(k, v)| (k.to_owned(), v.to_owned()))
3251                .collect(),
3252            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3253            create_table_if_not_exists: false,
3254            is_exactly_once: Some(true),
3255            commit_retry_num: 8,
3256            enable_compaction: true,
3257            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3258            enable_snapshot_expiration: true,
3259            write_mode: IcebergWriteMode::MergeOnRead,
3260            format_version: FormatVersion::V2,
3261            snapshot_expiration_max_age_millis: None,
3262            snapshot_expiration_retain_last: None,
3263            snapshot_expiration_clear_expired_files: true,
3264            snapshot_expiration_clear_expired_meta_data: true,
3265            max_snapshots_num_before_compaction: None,
3266            small_files_threshold_mb: None,
3267            delete_files_count_threshold: None,
3268            trigger_snapshot_count: None,
3269            target_file_size_mb: None,
3270            compaction_type: None,
3271            write_parquet_compression: None,
3272            write_parquet_max_row_group_rows: None,
3273        };
3274
3275        assert_eq!(iceberg_config, expected_iceberg_config);
3276
3277        assert_eq!(
3278            &iceberg_config.full_table_name().unwrap().to_string(),
3279            "demo_db.demo_table"
3280        );
3281    }
3282
3283    async fn test_create_catalog(configs: BTreeMap<String, String>) {
3284        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3285
3286        let _table = iceberg_config.load_table().await.unwrap();
3287    }
3288
3289    #[tokio::test]
3290    #[ignore]
3291    async fn test_storage_catalog() {
3292        let values = [
3293            ("connector", "iceberg"),
3294            ("type", "append-only"),
3295            ("force_append_only", "true"),
3296            ("s3.endpoint", "http://127.0.0.1:9301"),
3297            ("s3.access.key", "hummockadmin"),
3298            ("s3.secret.key", "hummockadmin"),
3299            ("s3.region", "us-east-1"),
3300            ("s3.path.style.access", "true"),
3301            ("catalog.name", "demo"),
3302            ("catalog.type", "storage"),
3303            ("warehouse.path", "s3://icebergdata/demo"),
3304            ("database.name", "s1"),
3305            ("table.name", "t1"),
3306        ]
3307        .into_iter()
3308        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3309        .collect();
3310
3311        test_create_catalog(values).await;
3312    }
3313
3314    #[tokio::test]
3315    #[ignore]
3316    async fn test_rest_catalog() {
3317        let values = [
3318            ("connector", "iceberg"),
3319            ("type", "append-only"),
3320            ("force_append_only", "true"),
3321            ("s3.endpoint", "http://127.0.0.1:9301"),
3322            ("s3.access.key", "hummockadmin"),
3323            ("s3.secret.key", "hummockadmin"),
3324            ("s3.region", "us-east-1"),
3325            ("s3.path.style.access", "true"),
3326            ("catalog.name", "demo"),
3327            ("catalog.type", "rest"),
3328            ("catalog.uri", "http://192.168.167.4:8181"),
3329            ("warehouse.path", "s3://icebergdata/demo"),
3330            ("database.name", "s1"),
3331            ("table.name", "t1"),
3332        ]
3333        .into_iter()
3334        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3335        .collect();
3336
3337        test_create_catalog(values).await;
3338    }
3339
3340    #[tokio::test]
3341    #[ignore]
3342    async fn test_jdbc_catalog() {
3343        let values = [
3344            ("connector", "iceberg"),
3345            ("type", "append-only"),
3346            ("force_append_only", "true"),
3347            ("s3.endpoint", "http://127.0.0.1:9301"),
3348            ("s3.access.key", "hummockadmin"),
3349            ("s3.secret.key", "hummockadmin"),
3350            ("s3.region", "us-east-1"),
3351            ("s3.path.style.access", "true"),
3352            ("catalog.name", "demo"),
3353            ("catalog.type", "jdbc"),
3354            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3355            ("catalog.jdbc.user", "admin"),
3356            ("catalog.jdbc.password", "123456"),
3357            ("warehouse.path", "s3://icebergdata/demo"),
3358            ("database.name", "s1"),
3359            ("table.name", "t1"),
3360        ]
3361        .into_iter()
3362        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3363        .collect();
3364
3365        test_create_catalog(values).await;
3366    }
3367
3368    #[tokio::test]
3369    #[ignore]
3370    async fn test_hive_catalog() {
3371        let values = [
3372            ("connector", "iceberg"),
3373            ("type", "append-only"),
3374            ("force_append_only", "true"),
3375            ("s3.endpoint", "http://127.0.0.1:9301"),
3376            ("s3.access.key", "hummockadmin"),
3377            ("s3.secret.key", "hummockadmin"),
3378            ("s3.region", "us-east-1"),
3379            ("s3.path.style.access", "true"),
3380            ("catalog.name", "demo"),
3381            ("catalog.type", "hive"),
3382            ("catalog.uri", "thrift://localhost:9083"),
3383            ("warehouse.path", "s3://icebergdata/demo"),
3384            ("database.name", "s1"),
3385            ("table.name", "t1"),
3386        ]
3387        .into_iter()
3388        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3389        .collect();
3390
3391        test_create_catalog(values).await;
3392    }
3393
3394    /// Test parsing Google/BigLake authentication configuration.
3395    #[test]
3396    fn test_parse_google_auth_config() {
3397        let values: BTreeMap<String, String> = [
3398            ("connector", "iceberg"),
3399            ("type", "append-only"),
3400            ("force_append_only", "true"),
3401            ("catalog.name", "biglake-catalog"),
3402            ("catalog.type", "rest"),
3403            ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3404            ("warehouse.path", "bq://projects/my-gcp-project"),
3405            ("catalog.header", "x-goog-user-project=my-gcp-project"),
3406            ("catalog.security", "google"),
3407            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3408            ("database.name", "my_dataset"),
3409            ("table.name", "my_table"),
3410        ]
3411        .into_iter()
3412        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3413        .collect();
3414
3415        let config = IcebergConfig::from_btreemap(values).unwrap();
3416        assert_eq!(config.catalog_type(), "rest");
3417        assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3418        assert_eq!(
3419            config.common.gcp_auth_scopes.as_deref(),
3420            Some(
3421                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3422            )
3423        );
3424        assert_eq!(
3425            config.common.warehouse_path.as_deref(),
3426            Some("bq://projects/my-gcp-project")
3427        );
3428        assert_eq!(
3429            config.common.catalog_header.as_deref(),
3430            Some("x-goog-user-project=my-gcp-project")
3431        );
3432    }
3433
3434    /// Test parsing `oauth2` security configuration.
3435    #[test]
3436    fn test_parse_oauth2_security_config() {
3437        let values: BTreeMap<String, String> = [
3438            ("connector", "iceberg"),
3439            ("type", "append-only"),
3440            ("force_append_only", "true"),
3441            ("catalog.name", "oauth2-catalog"),
3442            ("catalog.type", "rest"),
3443            ("catalog.uri", "https://example.com/iceberg/rest"),
3444            ("warehouse.path", "s3://my-bucket/warehouse"),
3445            ("catalog.security", "oauth2"),
3446            ("catalog.credential", "client_id:client_secret"),
3447            ("catalog.token", "bearer-token"),
3448            (
3449                "catalog.oauth2_server_uri",
3450                "https://oauth.example.com/token",
3451            ),
3452            ("catalog.scope", "read write"),
3453            ("database.name", "test_db"),
3454            ("table.name", "test_table"),
3455        ]
3456        .into_iter()
3457        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3458        .collect();
3459
3460        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3461
3462        // Verify catalog type
3463        assert_eq!(iceberg_config.catalog_type(), "rest");
3464
3465        // Verify OAuth2-specific options
3466        assert_eq!(
3467            iceberg_config.common.catalog_security.as_deref(),
3468            Some("oauth2")
3469        );
3470        assert_eq!(
3471            iceberg_config.common.catalog_credential.as_deref(),
3472            Some("client_id:client_secret")
3473        );
3474        assert_eq!(
3475            iceberg_config.common.catalog_token.as_deref(),
3476            Some("bearer-token")
3477        );
3478        assert_eq!(
3479            iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3480            Some("https://oauth.example.com/token")
3481        );
3482        assert_eq!(
3483            iceberg_config.common.catalog_scope.as_deref(),
3484            Some("read write")
3485        );
3486    }
3487
3488    /// Test parsing invalid security configuration.
3489    #[test]
3490    fn test_parse_invalid_security_config() {
3491        let values: BTreeMap<String, String> = [
3492            ("connector", "iceberg"),
3493            ("type", "append-only"),
3494            ("force_append_only", "true"),
3495            ("catalog.name", "invalid-catalog"),
3496            ("catalog.type", "rest"),
3497            ("catalog.uri", "https://example.com/iceberg/rest"),
3498            ("warehouse.path", "s3://my-bucket/warehouse"),
3499            ("catalog.security", "invalid_security_type"),
3500            ("database.name", "test_db"),
3501            ("table.name", "test_table"),
3502        ]
3503        .into_iter()
3504        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3505        .collect();
3506
3507        // This should still parse successfully, but with a warning for unknown security type
3508        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3509
3510        // Verify that the invalid security type is still stored
3511        assert_eq!(
3512            iceberg_config.common.catalog_security.as_deref(),
3513            Some("invalid_security_type")
3514        );
3515
3516        // Verify catalog type
3517        assert_eq!(iceberg_config.catalog_type(), "rest");
3518    }
3519
3520    /// Test parsing custom `FileIO` implementation configuration.
3521    #[test]
3522    fn test_parse_custom_io_impl_config() {
3523        let values: BTreeMap<String, String> = [
3524            ("connector", "iceberg"),
3525            ("type", "append-only"),
3526            ("force_append_only", "true"),
3527            ("catalog.name", "gcs-catalog"),
3528            ("catalog.type", "rest"),
3529            ("catalog.uri", "https://example.com/iceberg/rest"),
3530            ("warehouse.path", "gs://my-bucket/warehouse"),
3531            ("catalog.security", "google"),
3532            ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3533            ("database.name", "test_db"),
3534            ("table.name", "test_table"),
3535        ]
3536        .into_iter()
3537        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3538        .collect();
3539
3540        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3541
3542        // Verify catalog type
3543        assert_eq!(iceberg_config.catalog_type(), "rest");
3544
3545        // Verify custom `FileIO` implementation
3546        assert_eq!(
3547            iceberg_config.common.catalog_io_impl.as_deref(),
3548            Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3549        );
3550
3551        // Verify Google security is set
3552        assert_eq!(
3553            iceberg_config.common.catalog_security.as_deref(),
3554            Some("google")
3555        );
3556    }
3557
3558    #[test]
3559    fn test_config_constants_consistency() {
3560        // This test ensures our constants match the expected configuration names
3561        // If you change a constant, this test will remind you to update both places
3562        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3563        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3564        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3565        assert_eq!(WRITE_MODE, "write_mode");
3566        assert_eq!(
3567            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3568            "snapshot_expiration_retain_last"
3569        );
3570        assert_eq!(
3571            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3572            "snapshot_expiration_max_age_millis"
3573        );
3574        assert_eq!(
3575            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3576            "snapshot_expiration_clear_expired_files"
3577        );
3578        assert_eq!(
3579            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3580            "snapshot_expiration_clear_expired_meta_data"
3581        );
3582        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3583    }
3584
3585    /// Test parsing all compaction.* prefix configs and their default values.
3586    #[test]
3587    fn test_parse_compaction_config() {
3588        // Test with all compaction configs specified
3589        let values: BTreeMap<String, String> = [
3590            ("connector", "iceberg"),
3591            ("type", "upsert"),
3592            ("primary_key", "id"),
3593            ("warehouse.path", "s3://iceberg"),
3594            ("s3.endpoint", "http://127.0.0.1:9301"),
3595            ("s3.access.key", "test"),
3596            ("s3.secret.key", "test"),
3597            ("s3.region", "us-east-1"),
3598            ("catalog.type", "storage"),
3599            ("catalog.name", "demo"),
3600            ("database.name", "test_db"),
3601            ("table.name", "test_table"),
3602            ("enable_compaction", "true"),
3603            ("compaction.max_snapshots_num", "100"),
3604            ("compaction.small_files_threshold_mb", "512"),
3605            ("compaction.delete_files_count_threshold", "50"),
3606            ("compaction.trigger_snapshot_count", "10"),
3607            ("compaction.target_file_size_mb", "256"),
3608            ("compaction.type", "full"),
3609            ("compaction.write_parquet_compression", "zstd"),
3610            ("compaction.write_parquet_max_row_group_rows", "50000"),
3611        ]
3612        .into_iter()
3613        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3614        .collect();
3615
3616        let config = IcebergConfig::from_btreemap(values).unwrap();
3617        assert!(config.enable_compaction);
3618        assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3619        assert_eq!(config.small_files_threshold_mb, Some(512));
3620        assert_eq!(config.delete_files_count_threshold, Some(50));
3621        assert_eq!(config.trigger_snapshot_count, Some(10));
3622        assert_eq!(config.target_file_size_mb, Some(256));
3623        assert_eq!(config.compaction_type, Some(CompactionType::Full));
3624        assert_eq!(config.target_file_size_mb(), 256);
3625        assert_eq!(config.write_parquet_compression(), "zstd");
3626        assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3627
3628        // Test default values (no compaction configs specified)
3629        let values: BTreeMap<String, String> = [
3630            ("connector", "iceberg"),
3631            ("type", "append-only"),
3632            ("force_append_only", "true"),
3633            ("catalog.name", "test-catalog"),
3634            ("catalog.type", "storage"),
3635            ("warehouse.path", "s3://my-bucket/warehouse"),
3636            ("database.name", "test_db"),
3637            ("table.name", "test_table"),
3638        ]
3639        .into_iter()
3640        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3641        .collect();
3642
3643        let config = IcebergConfig::from_btreemap(values).unwrap();
3644        assert_eq!(config.target_file_size_mb(), 1024); // Default
3645        assert_eq!(config.write_parquet_compression(), "zstd"); // Default
3646        assert_eq!(config.write_parquet_max_row_group_rows(), 122880); // Default
3647    }
3648
3649    /// Test parquet compression parsing.
3650    #[test]
3651    fn test_parse_parquet_compression() {
3652        use parquet::basic::Compression;
3653
3654        use super::parse_parquet_compression;
3655
3656        // Test valid compression types
3657        assert!(matches!(
3658            parse_parquet_compression("snappy"),
3659            Compression::SNAPPY
3660        ));
3661        assert!(matches!(
3662            parse_parquet_compression("gzip"),
3663            Compression::GZIP(_)
3664        ));
3665        assert!(matches!(
3666            parse_parquet_compression("zstd"),
3667            Compression::ZSTD(_)
3668        ));
3669        assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3670        assert!(matches!(
3671            parse_parquet_compression("brotli"),
3672            Compression::BROTLI(_)
3673        ));
3674        assert!(matches!(
3675            parse_parquet_compression("uncompressed"),
3676            Compression::UNCOMPRESSED
3677        ));
3678
3679        // Test case insensitivity
3680        assert!(matches!(
3681            parse_parquet_compression("SNAPPY"),
3682            Compression::SNAPPY
3683        ));
3684        assert!(matches!(
3685            parse_parquet_compression("Zstd"),
3686            Compression::ZSTD(_)
3687        ));
3688
3689        // Test invalid compression (should fall back to SNAPPY)
3690        assert!(matches!(
3691            parse_parquet_compression("invalid"),
3692            Compression::SNAPPY
3693        ));
3694    }
3695
3696    #[test]
3697    fn test_append_only_rejects_copy_on_write() {
3698        // Test that append-only sinks reject copy-on-write mode
3699        let values = [
3700            ("connector", "iceberg"),
3701            ("type", "append-only"),
3702            ("warehouse.path", "s3://iceberg"),
3703            ("s3.endpoint", "http://127.0.0.1:9301"),
3704            ("s3.access.key", "test"),
3705            ("s3.secret.key", "test"),
3706            ("s3.region", "us-east-1"),
3707            ("catalog.type", "storage"),
3708            ("catalog.name", "demo"),
3709            ("database.name", "test_db"),
3710            ("table.name", "test_table"),
3711            ("write_mode", "copy-on-write"),
3712        ]
3713        .into_iter()
3714        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3715        .collect();
3716
3717        let result = IcebergConfig::from_btreemap(values);
3718        assert!(result.is_err());
3719        assert!(
3720            result
3721                .unwrap_err()
3722                .to_string()
3723                .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3724        );
3725    }
3726
3727    #[test]
3728    fn test_append_only_accepts_merge_on_read() {
3729        // Test that append-only sinks accept merge-on-read mode (explicit)
3730        let values = [
3731            ("connector", "iceberg"),
3732            ("type", "append-only"),
3733            ("warehouse.path", "s3://iceberg"),
3734            ("s3.endpoint", "http://127.0.0.1:9301"),
3735            ("s3.access.key", "test"),
3736            ("s3.secret.key", "test"),
3737            ("s3.region", "us-east-1"),
3738            ("catalog.type", "storage"),
3739            ("catalog.name", "demo"),
3740            ("database.name", "test_db"),
3741            ("table.name", "test_table"),
3742            ("write_mode", "merge-on-read"),
3743        ]
3744        .into_iter()
3745        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3746        .collect();
3747
3748        let result = IcebergConfig::from_btreemap(values);
3749        assert!(result.is_ok());
3750        let config = result.unwrap();
3751        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3752    }
3753
3754    #[test]
3755    fn test_append_only_defaults_to_merge_on_read() {
3756        // Test that append-only sinks default to merge-on-read when write_mode is not specified
3757        let values = [
3758            ("connector", "iceberg"),
3759            ("type", "append-only"),
3760            ("warehouse.path", "s3://iceberg"),
3761            ("s3.endpoint", "http://127.0.0.1:9301"),
3762            ("s3.access.key", "test"),
3763            ("s3.secret.key", "test"),
3764            ("s3.region", "us-east-1"),
3765            ("catalog.type", "storage"),
3766            ("catalog.name", "demo"),
3767            ("database.name", "test_db"),
3768            ("table.name", "test_table"),
3769        ]
3770        .into_iter()
3771        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3772        .collect();
3773
3774        let result = IcebergConfig::from_btreemap(values);
3775        assert!(result.is_ok());
3776        let config = result.unwrap();
3777        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3778    }
3779
3780    #[test]
3781    fn test_upsert_accepts_copy_on_write() {
3782        // Test that upsert sinks accept copy-on-write mode
3783        let values = [
3784            ("connector", "iceberg"),
3785            ("type", "upsert"),
3786            ("primary_key", "id"),
3787            ("warehouse.path", "s3://iceberg"),
3788            ("s3.endpoint", "http://127.0.0.1:9301"),
3789            ("s3.access.key", "test"),
3790            ("s3.secret.key", "test"),
3791            ("s3.region", "us-east-1"),
3792            ("catalog.type", "storage"),
3793            ("catalog.name", "demo"),
3794            ("database.name", "test_db"),
3795            ("table.name", "test_table"),
3796            ("write_mode", "copy-on-write"),
3797        ]
3798        .into_iter()
3799        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3800        .collect();
3801
3802        let result = IcebergConfig::from_btreemap(values);
3803        assert!(result.is_ok());
3804        let config = result.unwrap();
3805        assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3806    }
3807}