risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31    SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32    UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38    DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45    PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50    DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64    Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94    SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103    TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121    #[default]
122    MergeOnRead,
123    CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127    pub fn as_str(self) -> &'static str {
128        match self {
129            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131        }
132    }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136    type Err = SinkError;
137
138    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139        match s {
140            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142            _ => Err(SinkError::Config(anyhow!(format!(
143                "invalid write_mode: {}, must be one of: {}, {}",
144                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145            )))),
146        }
147    }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151    type Error = <Self as std::str::FromStr>::Err;
152
153    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154        value.parse()
155    }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159    type Error = <Self as std::str::FromStr>::Err;
160
161    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162        value.as_str().parse()
163    }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.write_str(self.as_str())
169    }
170}
171
172// Configuration constants
173pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182    "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197    "compaction.write_parquet_max_row_group_rows";
198pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
199pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
200
201const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
202
203fn default_commit_retry_num() -> u32 {
204    8
205}
206
207fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
208    Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
209}
210
211fn default_iceberg_write_mode() -> IcebergWriteMode {
212    IcebergWriteMode::MergeOnRead
213}
214
215fn default_iceberg_format_version() -> FormatVersion {
216    FormatVersion::V2
217}
218
219fn default_true() -> bool {
220    true
221}
222
223fn default_some_true() -> Option<bool> {
224    Some(true)
225}
226
227fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
228    let parsed = value
229        .trim()
230        .parse::<u8>()
231        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
232    match parsed {
233        1 => Ok(FormatVersion::V1),
234        2 => Ok(FormatVersion::V2),
235        3 => Ok(FormatVersion::V3),
236        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
237    }
238}
239
240fn deserialize_format_version<'de, D>(
241    deserializer: D,
242) -> std::result::Result<FormatVersion, D::Error>
243where
244    D: Deserializer<'de>,
245{
246    struct FormatVersionVisitor;
247
248    impl<'de> Visitor<'de> for FormatVersionVisitor {
249        type Value = FormatVersion;
250
251        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252            formatter.write_str("format-version as 1, 2, or 3")
253        }
254
255        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
256        where
257            E: de::Error,
258        {
259            let value = u8::try_from(value)
260                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
261            parse_format_version_str(&value.to_string()).map_err(E::custom)
262        }
263
264        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
265        where
266            E: de::Error,
267        {
268            let value = u8::try_from(value)
269                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
270            parse_format_version_str(&value.to_string()).map_err(E::custom)
271        }
272
273        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
274        where
275            E: de::Error,
276        {
277            parse_format_version_str(value).map_err(E::custom)
278        }
279
280        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
281        where
282            E: de::Error,
283        {
284            self.visit_str(&value)
285        }
286    }
287
288    deserializer.deserialize_any(FormatVersionVisitor)
289}
290
291/// Compaction type for Iceberg sink
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
293#[serde(rename_all = "kebab-case")]
294pub enum CompactionType {
295    /// Full compaction - rewrites all data files
296    #[default]
297    Full,
298    /// Small files compaction - only compact small files
299    SmallFiles,
300    /// Files with delete compaction - only compact files that have associated delete files
301    FilesWithDelete,
302}
303
304impl CompactionType {
305    pub fn as_str(&self) -> &'static str {
306        match self {
307            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
308            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
309            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
310        }
311    }
312}
313
314impl std::str::FromStr for CompactionType {
315    type Err = SinkError;
316
317    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
318        match s {
319            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
320            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
321            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
322            _ => Err(SinkError::Config(anyhow!(format!(
323                "invalid compaction_type: {}, must be one of: {}, {}, {}",
324                s,
325                ICEBERG_COMPACTION_TYPE_FULL,
326                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
327                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
328            )))),
329        }
330    }
331}
332
333impl TryFrom<&str> for CompactionType {
334    type Error = <Self as std::str::FromStr>::Err;
335
336    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
337        value.parse()
338    }
339}
340
341impl TryFrom<String> for CompactionType {
342    type Error = <Self as std::str::FromStr>::Err;
343
344    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
345        value.as_str().parse()
346    }
347}
348
349impl std::fmt::Display for CompactionType {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        write!(f, "{}", self.as_str())
352    }
353}
354
355#[serde_as]
356#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
357pub struct IcebergConfig {
358    pub r#type: String, // accept "append-only" or "upsert"
359
360    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
361    pub force_append_only: bool,
362
363    #[serde(flatten)]
364    common: IcebergCommon,
365
366    #[serde(flatten)]
367    table: IcebergTableIdentifier,
368
369    #[serde(
370        rename = "primary_key",
371        default,
372        deserialize_with = "deserialize_optional_string_seq_from_string"
373    )]
374    pub primary_key: Option<Vec<String>>,
375
376    // Props for java catalog props.
377    #[serde(skip)]
378    pub java_catalog_props: HashMap<String, String>,
379
380    #[serde(default)]
381    pub partition_by: Option<String>,
382
383    /// Commit every n(>0) checkpoints, default is 60.
384    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
385    #[serde_as(as = "DisplayFromStr")]
386    #[with_option(allow_alter_on_fly)]
387    pub commit_checkpoint_interval: u64,
388
389    /// Commit on the next checkpoint barrier after buffered write size exceeds this threshold.
390    /// Default is 128 MB.
391    #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
392    #[serde_as(as = "Option<DisplayFromStr>")]
393    #[with_option(allow_alter_on_fly)]
394    pub commit_checkpoint_size_threshold_mb: Option<u64>,
395
396    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
397    pub create_table_if_not_exists: bool,
398
399    /// Whether it is `exactly_once`, the default is true.
400    #[serde(default = "default_some_true")]
401    #[serde_as(as = "Option<DisplayFromStr>")]
402    pub is_exactly_once: Option<bool>,
403    // Retry commit num when iceberg commit fail. default is 8.
404    // # TODO
405    // Iceberg table may store the retry commit num in table meta.
406    // We should try to find and use that as default commit retry num first.
407    #[serde(default = "default_commit_retry_num")]
408    pub commit_retry_num: u32,
409
410    /// Whether to enable iceberg compaction.
411    #[serde(
412        rename = "enable_compaction",
413        default,
414        deserialize_with = "deserialize_bool_from_string"
415    )]
416    #[with_option(allow_alter_on_fly)]
417    pub enable_compaction: bool,
418
419    /// The interval of iceberg compaction
420    #[serde(rename = "compaction_interval_sec", default)]
421    #[serde_as(as = "Option<DisplayFromStr>")]
422    #[with_option(allow_alter_on_fly)]
423    pub compaction_interval_sec: Option<u64>,
424
425    /// Whether to enable iceberg expired snapshots.
426    #[serde(
427        rename = "enable_snapshot_expiration",
428        default,
429        deserialize_with = "deserialize_bool_from_string"
430    )]
431    #[with_option(allow_alter_on_fly)]
432    pub enable_snapshot_expiration: bool,
433
434    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
435    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
436    pub write_mode: IcebergWriteMode,
437
438    /// Iceberg format version for table creation.
439    #[serde(
440        rename = "format_version",
441        default = "default_iceberg_format_version",
442        deserialize_with = "deserialize_format_version"
443    )]
444    pub format_version: FormatVersion,
445
446    /// The maximum age (in milliseconds) for snapshots before they expire
447    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
448    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
449    #[serde_as(as = "Option<DisplayFromStr>")]
450    #[with_option(allow_alter_on_fly)]
451    pub snapshot_expiration_max_age_millis: Option<i64>,
452
453    /// The number of snapshots to retain
454    #[serde(rename = "snapshot_expiration_retain_last", default)]
455    #[serde_as(as = "Option<DisplayFromStr>")]
456    #[with_option(allow_alter_on_fly)]
457    pub snapshot_expiration_retain_last: Option<i32>,
458
459    #[serde(
460        rename = "snapshot_expiration_clear_expired_files",
461        default = "default_true",
462        deserialize_with = "deserialize_bool_from_string"
463    )]
464    #[with_option(allow_alter_on_fly)]
465    pub snapshot_expiration_clear_expired_files: bool,
466
467    #[serde(
468        rename = "snapshot_expiration_clear_expired_meta_data",
469        default = "default_true",
470        deserialize_with = "deserialize_bool_from_string"
471    )]
472    #[with_option(allow_alter_on_fly)]
473    pub snapshot_expiration_clear_expired_meta_data: bool,
474
475    /// The maximum number of snapshots allowed since the last rewrite operation
476    /// If set, sink will check snapshot count and wait if exceeded
477    #[serde(rename = "compaction.max_snapshots_num", default)]
478    #[serde_as(as = "Option<DisplayFromStr>")]
479    #[with_option(allow_alter_on_fly)]
480    pub max_snapshots_num_before_compaction: Option<usize>,
481
482    #[serde(rename = "compaction.small_files_threshold_mb", default)]
483    #[serde_as(as = "Option<DisplayFromStr>")]
484    #[with_option(allow_alter_on_fly)]
485    pub small_files_threshold_mb: Option<u64>,
486
487    #[serde(rename = "compaction.delete_files_count_threshold", default)]
488    #[serde_as(as = "Option<DisplayFromStr>")]
489    #[with_option(allow_alter_on_fly)]
490    pub delete_files_count_threshold: Option<usize>,
491
492    #[serde(rename = "compaction.trigger_snapshot_count", default)]
493    #[serde_as(as = "Option<DisplayFromStr>")]
494    #[with_option(allow_alter_on_fly)]
495    pub trigger_snapshot_count: Option<usize>,
496
497    #[serde(rename = "compaction.target_file_size_mb", default)]
498    #[serde_as(as = "Option<DisplayFromStr>")]
499    #[with_option(allow_alter_on_fly)]
500    pub target_file_size_mb: Option<u64>,
501
502    /// Compaction type: `full`, `small-files`, or `files-with-delete`
503    /// If not set, will default to `full`
504    #[serde(rename = "compaction.type", default)]
505    #[with_option(allow_alter_on_fly)]
506    pub compaction_type: Option<CompactionType>,
507
508    /// Parquet compression codec
509    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
510    /// Default is snappy
511    #[serde(rename = "compaction.write_parquet_compression", default)]
512    #[with_option(allow_alter_on_fly)]
513    pub write_parquet_compression: Option<String>,
514
515    /// Maximum number of rows in a Parquet row group
516    /// Default is 122880 (from developer config)
517    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
518    #[serde_as(as = "Option<DisplayFromStr>")]
519    #[with_option(allow_alter_on_fly)]
520    pub write_parquet_max_row_group_rows: Option<usize>,
521}
522
523impl EnforceSecret for IcebergConfig {
524    fn enforce_secret<'a>(
525        prop_iter: impl Iterator<Item = &'a str>,
526    ) -> crate::error::ConnectorResult<()> {
527        for prop in prop_iter {
528            IcebergCommon::enforce_one(prop)?;
529        }
530        Ok(())
531    }
532
533    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
534        IcebergCommon::enforce_one(prop)
535    }
536}
537
538impl IcebergConfig {
539    /// Validate that append-only sinks use merge-on-read mode
540    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
541    fn validate_append_only_write_mode(
542        sink_type: &str,
543        write_mode: IcebergWriteMode,
544    ) -> Result<()> {
545        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
546            return Err(SinkError::Config(anyhow!(
547                "'copy-on-write' mode is not supported for append-only iceberg sink. \
548                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
549            )));
550        }
551        Ok(())
552    }
553
554    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
555        let mut config =
556            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
557                .map_err(|e| SinkError::Config(anyhow!(e)))?;
558
559        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
560            return Err(SinkError::Config(anyhow!(
561                "`{}` must be {}, or {}",
562                SINK_TYPE_OPTION,
563                SINK_TYPE_APPEND_ONLY,
564                SINK_TYPE_UPSERT
565            )));
566        }
567
568        if config.r#type == SINK_TYPE_UPSERT {
569            if let Some(primary_key) = &config.primary_key {
570                if primary_key.is_empty() {
571                    return Err(SinkError::Config(anyhow!(
572                        "`primary-key` must not be empty in {}",
573                        SINK_TYPE_UPSERT
574                    )));
575                }
576            } else {
577                return Err(SinkError::Config(anyhow!(
578                    "Must set `primary-key` in {}",
579                    SINK_TYPE_UPSERT
580                )));
581            }
582        }
583
584        // Enforce merge-on-read for append-only sinks
585        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
586
587        // All configs start with "catalog." will be treated as java configs.
588        config.java_catalog_props = values
589            .iter()
590            .filter(|(k, _v)| {
591                k.starts_with("catalog.")
592                    && k != &"catalog.uri"
593                    && k != &"catalog.type"
594                    && k != &"catalog.name"
595                    && k != &"catalog.header"
596            })
597            .map(|(k, v)| (k[8..].to_string(), v.clone()))
598            .collect();
599
600        if config.commit_checkpoint_interval == 0 {
601            return Err(SinkError::Config(anyhow!(
602                "`commit-checkpoint-interval` must be greater than 0"
603            )));
604        }
605
606        if config.commit_checkpoint_size_threshold_mb == Some(0) {
607            return Err(SinkError::Config(anyhow!(
608                "`commit_checkpoint_size_threshold_mb` must be greater than 0"
609            )));
610        }
611
612        // Validate table identifier (e.g., database.name should not contain dots)
613        config
614            .table
615            .validate()
616            .map_err(|e| SinkError::Config(anyhow!(e)))?;
617
618        Ok(config)
619    }
620
621    pub fn catalog_type(&self) -> &str {
622        self.common.catalog_type()
623    }
624
625    pub async fn load_table(&self) -> Result<Table> {
626        self.common
627            .load_table(&self.table, &self.java_catalog_props)
628            .await
629            .map_err(Into::into)
630    }
631
632    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
633        self.common
634            .create_catalog(&self.java_catalog_props)
635            .await
636            .map_err(Into::into)
637    }
638
639    pub fn full_table_name(&self) -> Result<TableIdent> {
640        self.table.to_table_ident().map_err(Into::into)
641    }
642
643    pub fn catalog_name(&self) -> String {
644        self.common.catalog_name()
645    }
646
647    pub fn table_format_version(&self) -> FormatVersion {
648        self.format_version
649    }
650
651    pub fn compaction_interval_sec(&self) -> u64 {
652        // default to 1 hour
653        self.compaction_interval_sec.unwrap_or(3600)
654    }
655
656    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
657    /// Returns `current_time_ms` - `max_age_millis`
658    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
659        self.snapshot_expiration_max_age_millis
660            .map(|max_age_millis| current_time_ms - max_age_millis)
661    }
662
663    pub fn trigger_snapshot_count(&self) -> usize {
664        self.trigger_snapshot_count.unwrap_or(usize::MAX)
665    }
666
667    pub fn small_files_threshold_mb(&self) -> u64 {
668        self.small_files_threshold_mb.unwrap_or(64)
669    }
670
671    pub fn delete_files_count_threshold(&self) -> usize {
672        self.delete_files_count_threshold.unwrap_or(256)
673    }
674
675    pub fn target_file_size_mb(&self) -> u64 {
676        self.target_file_size_mb.unwrap_or(1024)
677    }
678
679    pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
680        self.commit_checkpoint_size_threshold_mb
681            .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
682    }
683
684    /// Get the compaction type as an enum
685    /// This method parses the string and returns the enum value
686    pub fn compaction_type(&self) -> CompactionType {
687        self.compaction_type.unwrap_or_default()
688    }
689
690    /// Get the parquet compression codec
691    /// Default is "zstd"
692    pub fn write_parquet_compression(&self) -> &str {
693        self.write_parquet_compression.as_deref().unwrap_or("zstd")
694    }
695
696    /// Get the maximum number of rows in a Parquet row group
697    /// Default is 122880 (from developer config default)
698    pub fn write_parquet_max_row_group_rows(&self) -> usize {
699        self.write_parquet_max_row_group_rows.unwrap_or(122880)
700    }
701
702    /// Parse the compression codec string into Parquet Compression enum
703    /// Returns SNAPPY as default if parsing fails or not specified
704    pub fn get_parquet_compression(&self) -> Compression {
705        parse_parquet_compression(self.write_parquet_compression())
706    }
707}
708
709/// Parse compression codec string to Parquet Compression enum
710fn parse_parquet_compression(codec: &str) -> Compression {
711    match codec.to_lowercase().as_str() {
712        "uncompressed" => Compression::UNCOMPRESSED,
713        "snappy" => Compression::SNAPPY,
714        "gzip" => Compression::GZIP(Default::default()),
715        "lzo" => Compression::LZO,
716        "brotli" => Compression::BROTLI(Default::default()),
717        "lz4" => Compression::LZ4,
718        "zstd" => Compression::ZSTD(Default::default()),
719        _ => {
720            tracing::warn!(
721                "Unknown compression codec '{}', falling back to SNAPPY",
722                codec
723            );
724            Compression::SNAPPY
725        }
726    }
727}
728
729pub struct IcebergSink {
730    pub config: IcebergConfig,
731    param: SinkParam,
732    // In upsert mode, it never be None and empty.
733    unique_column_ids: Option<Vec<usize>>,
734}
735
736impl EnforceSecret for IcebergSink {
737    fn enforce_secret<'a>(
738        prop_iter: impl Iterator<Item = &'a str>,
739    ) -> crate::error::ConnectorResult<()> {
740        for prop in prop_iter {
741            IcebergConfig::enforce_one(prop)?;
742        }
743        Ok(())
744    }
745}
746
747impl TryFrom<SinkParam> for IcebergSink {
748    type Error = SinkError;
749
750    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
751        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
752        IcebergSink::new(config, param)
753    }
754}
755
756impl Debug for IcebergSink {
757    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
758        f.debug_struct("IcebergSink")
759            .field("config", &self.config)
760            .finish()
761    }
762}
763
764async fn create_and_validate_table_impl(
765    config: &IcebergConfig,
766    param: &SinkParam,
767) -> Result<Table> {
768    if config.create_table_if_not_exists {
769        create_table_if_not_exists_impl(config, param).await?;
770    }
771
772    let table = config
773        .load_table()
774        .await
775        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
776
777    let sink_schema = param.schema();
778    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
779        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
780
781    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
782        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
783
784    Ok(table)
785}
786
787async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
788    let catalog = config.create_catalog().await?;
789    let namespace = if let Some(database_name) = config.table.database_name() {
790        let namespace = NamespaceIdent::new(database_name.to_owned());
791        if !catalog
792            .namespace_exists(&namespace)
793            .await
794            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
795        {
796            catalog
797                .create_namespace(&namespace, HashMap::default())
798                .await
799                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
800                .context("failed to create iceberg namespace")?;
801        }
802        namespace
803    } else {
804        bail!("database name must be set if you want to create table")
805    };
806
807    let table_id = config
808        .full_table_name()
809        .context("Unable to parse table name")?;
810    if !catalog
811        .table_exists(&table_id)
812        .await
813        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
814    {
815        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
816        // convert risingwave schema -> arrow schema -> iceberg schema
817        let arrow_fields = param
818            .columns
819            .iter()
820            .map(|column| {
821                Ok(iceberg_create_table_arrow_convert
822                    .to_arrow_field(&column.name, &column.data_type)
823                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
824                    .context(format!(
825                        "failed to convert {}: {} to arrow type",
826                        &column.name, &column.data_type
827                    ))?)
828            })
829            .collect::<Result<Vec<ArrowField>>>()?;
830        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
831        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
832            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
833            .context("failed to convert arrow schema to iceberg schema")?;
834
835        let location = {
836            let mut names = namespace.clone().inner();
837            names.push(config.table.table_name().to_owned());
838            match &config.common.warehouse_path {
839                Some(warehouse_path) => {
840                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
841                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
842                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
843                    let url = Url::parse(warehouse_path);
844                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
845                        // For rest catalog, the warehouse_path could be a warehouse name.
846                        // In this case, we should specify the location when creating a table.
847                        if config.common.catalog_type() == "rest"
848                            || config.common.catalog_type() == "rest_rust"
849                        {
850                            None
851                        } else {
852                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
853                        }
854                    } else if warehouse_path.ends_with('/') {
855                        Some(format!("{}{}", warehouse_path, names.join("/")))
856                    } else {
857                        Some(format!("{}/{}", warehouse_path, names.join("/")))
858                    }
859                }
860                None => None,
861            }
862        };
863
864        let partition_spec = match &config.partition_by {
865            Some(partition_by) => {
866                let mut partition_fields = Vec::<UnboundPartitionField>::new();
867                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
868                    .into_iter()
869                    .enumerate()
870                {
871                    match iceberg_schema.field_id_by_name(&column) {
872                        Some(id) => partition_fields.push(
873                            UnboundPartitionField::builder()
874                                .source_id(id)
875                                .transform(transform)
876                                .name(format!("_p_{}", column))
877                                .field_id(PARTITION_DATA_ID_START + i as i32)
878                                .build(),
879                        ),
880                        None => bail!(format!(
881                            "Partition source column does not exist in schema: {}",
882                            column
883                        )),
884                    };
885                }
886                Some(
887                    UnboundPartitionSpec::builder()
888                        .with_spec_id(0)
889                        .add_partition_fields(partition_fields)
890                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
891                        .context("failed to add partition columns")?
892                        .build(),
893                )
894            }
895            None => None,
896        };
897
898        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
899        let properties = HashMap::from([(
900            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
901            (config.format_version as u8).to_string(),
902        )]);
903
904        let table_creation_builder = TableCreation::builder()
905            .name(config.table.table_name().to_owned())
906            .schema(iceberg_schema)
907            .format_version(config.table_format_version())
908            .properties(properties);
909
910        let table_creation = match (location, partition_spec) {
911            (Some(location), Some(partition_spec)) => table_creation_builder
912                .location(location)
913                .partition_spec(partition_spec)
914                .build(),
915            (Some(location), None) => table_creation_builder.location(location).build(),
916            (None, Some(partition_spec)) => table_creation_builder
917                .partition_spec(partition_spec)
918                .build(),
919            (None, None) => table_creation_builder.build(),
920        };
921
922        catalog
923            .create_table(&namespace, table_creation)
924            .await
925            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
926            .context("failed to create iceberg table")?;
927    }
928    Ok(())
929}
930
931impl IcebergSink {
932    pub async fn create_and_validate_table(&self) -> Result<Table> {
933        create_and_validate_table_impl(&self.config, &self.param).await
934    }
935
936    pub async fn create_table_if_not_exists(&self) -> Result<()> {
937        create_table_if_not_exists_impl(&self.config, &self.param).await
938    }
939
940    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
941        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
942            if let Some(pk) = &config.primary_key {
943                let mut unique_column_ids = Vec::with_capacity(pk.len());
944                for col_name in pk {
945                    let id = param
946                        .columns
947                        .iter()
948                        .find(|col| col.name.as_str() == col_name)
949                        .ok_or_else(|| {
950                            SinkError::Config(anyhow!(
951                                "Primary key column {} not found in sink schema",
952                                col_name
953                            ))
954                        })?
955                        .column_id
956                        .get_id() as usize;
957                    unique_column_ids.push(id);
958                }
959                Some(unique_column_ids)
960            } else {
961                unreachable!()
962            }
963        } else {
964            None
965        };
966        Ok(Self {
967            config,
968            param,
969            unique_column_ids,
970        })
971    }
972}
973
974impl Sink for IcebergSink {
975    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
976
977    const SINK_NAME: &'static str = ICEBERG_SINK;
978
979    async fn validate(&self) -> Result<()> {
980        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
981            bail!("Snowflake catalog only supports iceberg sources");
982        }
983
984        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
985            risingwave_common::license::Feature::IcebergSinkWithGlue
986                .check_available()
987                .map_err(|e| anyhow::anyhow!(e))?;
988        }
989
990        // Enforce merge-on-read for append-only tables
991        IcebergConfig::validate_append_only_write_mode(
992            &self.config.r#type,
993            self.config.write_mode,
994        )?;
995
996        // Validate compaction type configuration
997        let compaction_type = self.config.compaction_type();
998
999        // Check COW mode constraints
1000        // COW mode only supports 'full' compaction type
1001        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
1002            && compaction_type != CompactionType::Full
1003        {
1004            bail!(
1005                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
1006                compaction_type
1007            );
1008        }
1009
1010        match compaction_type {
1011            CompactionType::SmallFiles => {
1012                // 1. check license
1013                risingwave_common::license::Feature::IcebergCompaction
1014                    .check_available()
1015                    .map_err(|e| anyhow::anyhow!(e))?;
1016
1017                // 2. check write mode
1018                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1019                    bail!(
1020                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1021                        self.config.write_mode
1022                    );
1023                }
1024
1025                // 3. check conflicting parameters
1026                if self.config.delete_files_count_threshold.is_some() {
1027                    bail!(
1028                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
1029                    );
1030                }
1031            }
1032            CompactionType::FilesWithDelete => {
1033                // 1. check license
1034                risingwave_common::license::Feature::IcebergCompaction
1035                    .check_available()
1036                    .map_err(|e| anyhow::anyhow!(e))?;
1037
1038                // 2. check write mode
1039                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1040                    bail!(
1041                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1042                        self.config.write_mode
1043                    );
1044                }
1045
1046                // 3. check conflicting parameters
1047                if self.config.small_files_threshold_mb.is_some() {
1048                    bail!(
1049                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1050                    );
1051                }
1052            }
1053            CompactionType::Full => {
1054                // Full compaction has no special requirements
1055            }
1056        }
1057
1058        let _ = self.create_and_validate_table().await?;
1059        Ok(())
1060    }
1061
1062    fn support_schema_change() -> bool {
1063        true
1064    }
1065
1066    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1067        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1068
1069        // Validate compaction interval
1070        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1071            if iceberg_config.enable_compaction && compaction_interval == 0 {
1072                bail!(
1073                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1074                );
1075            }
1076
1077            // log compaction_interval
1078            tracing::info!(
1079                "Alter config compaction_interval set to {} seconds",
1080                compaction_interval
1081            );
1082        }
1083
1084        // Validate max snapshots
1085        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1086            && max_snapshots < 1
1087        {
1088            bail!(
1089                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1090                max_snapshots
1091            );
1092        }
1093
1094        // Validate target file size
1095        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1096            && target_file_size_mb == 0
1097        {
1098            bail!("`compaction.target_file_size_mb` must be greater than 0");
1099        }
1100
1101        // Validate parquet max row group rows
1102        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1103            && max_row_group_rows == 0
1104        {
1105            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1106        }
1107
1108        // Validate parquet compression codec
1109        if let Some(ref compression) = iceberg_config.write_parquet_compression {
1110            let valid_codecs = [
1111                "uncompressed",
1112                "snappy",
1113                "gzip",
1114                "lzo",
1115                "brotli",
1116                "lz4",
1117                "zstd",
1118            ];
1119            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1120                bail!(
1121                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1122                    valid_codecs,
1123                    compression
1124                );
1125            }
1126        }
1127
1128        Ok(())
1129    }
1130
1131    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1132        let writer = IcebergSinkWriter::new(
1133            self.config.clone(),
1134            self.param.clone(),
1135            writer_param.clone(),
1136            self.unique_column_ids.clone(),
1137        );
1138
1139        let commit_checkpoint_interval =
1140            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1141                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1142            );
1143        let log_sinker = CoordinatedLogSinker::new(
1144            &writer_param,
1145            self.param.clone(),
1146            writer,
1147            commit_checkpoint_interval,
1148        )
1149        .await?;
1150
1151        Ok(log_sinker)
1152    }
1153
1154    fn is_coordinated_sink(&self) -> bool {
1155        true
1156    }
1157
1158    async fn new_coordinator(
1159        &self,
1160        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1161    ) -> Result<SinkCommitCoordinator> {
1162        let catalog = self.config.create_catalog().await?;
1163        let table = self.create_and_validate_table().await?;
1164        let coordinator = IcebergSinkCommitter {
1165            catalog,
1166            table,
1167            last_commit_epoch: 0,
1168            sink_id: self.param.sink_id,
1169            config: self.config.clone(),
1170            param: self.param.clone(),
1171            commit_retry_num: self.config.commit_retry_num,
1172            iceberg_compact_stat_sender,
1173        };
1174        if self.config.is_exactly_once.unwrap_or_default() {
1175            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1176        } else {
1177            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1178        }
1179    }
1180}
1181
1182/// None means no project.
1183/// Prepare represent the extra partition column idx.
1184/// Done represents the project idx vec.
1185///
1186/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
1187/// to create the project idx vec.
1188enum ProjectIdxVec {
1189    None,
1190    Prepare(usize),
1191    Done(Vec<usize>),
1192}
1193
1194type DataFileWriterBuilderType =
1195    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1196type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1197    ParquetWriterBuilder,
1198    DefaultLocationGenerator,
1199    DefaultFileNameGenerator,
1200>;
1201type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1202    ParquetWriterBuilder,
1203    DefaultLocationGenerator,
1204    DefaultFileNameGenerator,
1205>;
1206type DeletionVectorWriterBuilderType =
1207    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1208type DeletionVectorWriterType =
1209    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1210type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1211    ParquetWriterBuilder,
1212    DefaultLocationGenerator,
1213    DefaultFileNameGenerator,
1214>;
1215
1216#[derive(Clone)]
1217enum PositionDeleteWriterBuilderType {
1218    PositionDelete(PositionDeleteFileWriterBuilderType),
1219    DeletionVector(DeletionVectorWriterBuilderType),
1220}
1221
1222enum PositionDeleteWriterType {
1223    PositionDelete(PositionDeleteFileWriterType),
1224    DeletionVector(DeletionVectorWriterType),
1225}
1226
1227#[async_trait]
1228impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1229    type R = PositionDeleteWriterType;
1230
1231    async fn build(
1232        &self,
1233        partition_key: Option<iceberg::spec::PartitionKey>,
1234    ) -> iceberg::Result<Self::R> {
1235        match self {
1236            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1237                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1238            ),
1239            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1240                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1241            ),
1242        }
1243    }
1244}
1245
1246#[async_trait]
1247impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1248    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1249        match self {
1250            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1251            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1252        }
1253    }
1254
1255    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1256        match self {
1257            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1258            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1259        }
1260    }
1261}
1262
1263#[derive(Clone)]
1264struct SharedIcebergWriterBuilder<B>(Arc<B>);
1265
1266#[async_trait]
1267impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1268    type R = B::R;
1269
1270    async fn build(
1271        &self,
1272        partition_key: Option<iceberg::spec::PartitionKey>,
1273    ) -> iceberg::Result<Self::R> {
1274        self.0.build(partition_key).await
1275    }
1276}
1277
1278#[derive(Clone)]
1279struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1280    inner: Arc<B>,
1281    fanout_enabled: bool,
1282    schema: IcebergSchemaRef,
1283    partition_spec: PartitionSpecRef,
1284    compute_partition: bool,
1285}
1286
1287impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1288    fn new(
1289        inner: B,
1290        fanout_enabled: bool,
1291        schema: IcebergSchemaRef,
1292        partition_spec: PartitionSpecRef,
1293        compute_partition: bool,
1294    ) -> Self {
1295        Self {
1296            inner: Arc::new(inner),
1297            fanout_enabled,
1298            schema,
1299            partition_spec,
1300            compute_partition,
1301        }
1302    }
1303
1304    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1305        let partition_splitter = match (
1306            self.partition_spec.is_unpartitioned(),
1307            self.compute_partition,
1308        ) {
1309            (true, _) => None,
1310            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1311                self.schema.clone(),
1312                self.partition_spec.clone(),
1313            )?),
1314            (false, false) => Some(
1315                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1316                    self.schema.clone(),
1317                    self.partition_spec.clone(),
1318                )?,
1319            ),
1320        };
1321
1322        Ok(TaskWriter::new_with_partition_splitter(
1323            SharedIcebergWriterBuilder(self.inner.clone()),
1324            self.fanout_enabled,
1325            self.schema.clone(),
1326            self.partition_spec.clone(),
1327            partition_splitter,
1328        ))
1329    }
1330}
1331
1332pub enum IcebergSinkWriter {
1333    Created(IcebergSinkWriterArgs),
1334    Initialized(IcebergSinkWriterInner),
1335}
1336
1337pub struct IcebergSinkWriterArgs {
1338    config: IcebergConfig,
1339    sink_param: SinkParam,
1340    writer_param: SinkWriterParam,
1341    unique_column_ids: Option<Vec<usize>>,
1342}
1343
1344pub struct IcebergSinkWriterInner {
1345    writer: IcebergWriterDispatch,
1346    arrow_schema: SchemaRef,
1347    // See comments below
1348    metrics: IcebergWriterMetrics,
1349    // State of iceberg table for this writer
1350    table: Table,
1351    // For chunk with extra partition column, we should remove this column before write.
1352    // This project index vec is used to avoid create project idx each time.
1353    project_idx_vec: ProjectIdxVec,
1354    commit_checkpoint_size_threshold_bytes: Option<u64>,
1355    uncommitted_write_bytes: u64,
1356}
1357
1358#[allow(clippy::type_complexity)]
1359enum IcebergWriterDispatch {
1360    Append {
1361        writer: Option<Box<dyn IcebergWriter>>,
1362        writer_builder:
1363            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1364    },
1365    Upsert {
1366        writer: Option<Box<dyn IcebergWriter>>,
1367        writer_builder: TaskWriterBuilderWrapper<
1368            MonitoredGeneralWriterBuilder<
1369                DeltaWriterBuilder<
1370                    DataFileWriterBuilderType,
1371                    PositionDeleteWriterBuilderType,
1372                    EqualityDeleteFileWriterBuilderType,
1373                >,
1374            >,
1375        >,
1376        arrow_schema_with_op_column: SchemaRef,
1377    },
1378}
1379
1380impl IcebergWriterDispatch {
1381    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1382        match self {
1383            IcebergWriterDispatch::Append { writer, .. }
1384            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1385        }
1386    }
1387}
1388
1389pub struct IcebergWriterMetrics {
1390    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1391    // They are actually used in `PrometheusWriterBuilder`:
1392    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1393    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1394    _write_qps: LabelGuardedIntCounter,
1395    _write_latency: LabelGuardedHistogram,
1396    write_bytes: LabelGuardedIntCounter,
1397}
1398
1399impl IcebergSinkWriter {
1400    pub fn new(
1401        config: IcebergConfig,
1402        sink_param: SinkParam,
1403        writer_param: SinkWriterParam,
1404        unique_column_ids: Option<Vec<usize>>,
1405    ) -> Self {
1406        Self::Created(IcebergSinkWriterArgs {
1407            config,
1408            sink_param,
1409            writer_param,
1410            unique_column_ids,
1411        })
1412    }
1413}
1414
1415impl IcebergSinkWriterInner {
1416    fn should_commit_on_checkpoint(&self) -> bool {
1417        self.commit_checkpoint_size_threshold_bytes
1418            .is_some_and(|threshold| {
1419                self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
1420            })
1421    }
1422
1423    fn build_append_only(
1424        config: &IcebergConfig,
1425        table: Table,
1426        writer_param: &SinkWriterParam,
1427    ) -> Result<Self> {
1428        let SinkWriterParam {
1429            extra_partition_col_idx,
1430            actor_id,
1431            sink_id,
1432            sink_name,
1433            ..
1434        } = writer_param;
1435        let metrics_labels = [
1436            &actor_id.to_string(),
1437            &sink_id.to_string(),
1438            sink_name.as_str(),
1439        ];
1440
1441        // Metrics
1442        let write_qps = GLOBAL_SINK_METRICS
1443            .iceberg_write_qps
1444            .with_guarded_label_values(&metrics_labels);
1445        let write_latency = GLOBAL_SINK_METRICS
1446            .iceberg_write_latency
1447            .with_guarded_label_values(&metrics_labels);
1448        // # TODO
1449        // Unused. Add this metrics later.
1450        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1451            .iceberg_rolling_unflushed_data_file
1452            .with_guarded_label_values(&metrics_labels);
1453        let write_bytes = GLOBAL_SINK_METRICS
1454            .iceberg_write_bytes
1455            .with_guarded_label_values(&metrics_labels);
1456
1457        let schema = table.metadata().current_schema();
1458        let partition_spec = table.metadata().default_partition_spec();
1459        let fanout_enabled = !partition_spec.fields().is_empty();
1460        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1461        let unique_uuid_suffix = Uuid::now_v7();
1462
1463        let parquet_writer_properties = WriterProperties::builder()
1464            .set_compression(config.get_parquet_compression())
1465            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1466            .set_created_by(PARQUET_CREATED_BY.to_owned())
1467            .build();
1468
1469        let parquet_writer_builder =
1470            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1471        let rolling_builder = RollingFileWriterBuilder::new(
1472            parquet_writer_builder,
1473            (config.target_file_size_mb() * 1024 * 1024) as usize,
1474            table.file_io().clone(),
1475            DefaultLocationGenerator::new(table.metadata().clone())
1476                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1477            DefaultFileNameGenerator::new(
1478                writer_param.actor_id.to_string(),
1479                Some(unique_uuid_suffix.to_string()),
1480                iceberg::spec::DataFileFormat::Parquet,
1481            ),
1482        );
1483        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1484        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1485            data_file_builder,
1486            write_qps.clone(),
1487            write_latency.clone(),
1488        );
1489        let writer_builder = TaskWriterBuilderWrapper::new(
1490            monitored_builder,
1491            fanout_enabled,
1492            schema.clone(),
1493            partition_spec.clone(),
1494            true,
1495        );
1496        let inner_writer = Some(Box::new(
1497            writer_builder
1498                .build()
1499                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1500        ) as Box<dyn IcebergWriter>);
1501        Ok(Self {
1502            arrow_schema: Arc::new(
1503                schema_to_arrow_schema(table.metadata().current_schema())
1504                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1505            ),
1506            metrics: IcebergWriterMetrics {
1507                _write_qps: write_qps,
1508                _write_latency: write_latency,
1509                write_bytes,
1510            },
1511            writer: IcebergWriterDispatch::Append {
1512                writer: inner_writer,
1513                writer_builder,
1514            },
1515            table,
1516            project_idx_vec: {
1517                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1518                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1519                } else {
1520                    ProjectIdxVec::None
1521                }
1522            },
1523            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1524            uncommitted_write_bytes: 0,
1525        })
1526    }
1527
1528    fn build_upsert(
1529        config: &IcebergConfig,
1530        table: Table,
1531        unique_column_ids: Vec<usize>,
1532        writer_param: &SinkWriterParam,
1533    ) -> Result<Self> {
1534        let SinkWriterParam {
1535            extra_partition_col_idx,
1536            actor_id,
1537            sink_id,
1538            sink_name,
1539            ..
1540        } = writer_param;
1541        let metrics_labels = [
1542            &actor_id.to_string(),
1543            &sink_id.to_string(),
1544            sink_name.as_str(),
1545        ];
1546        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1547
1548        // Metrics
1549        let write_qps = GLOBAL_SINK_METRICS
1550            .iceberg_write_qps
1551            .with_guarded_label_values(&metrics_labels);
1552        let write_latency = GLOBAL_SINK_METRICS
1553            .iceberg_write_latency
1554            .with_guarded_label_values(&metrics_labels);
1555        // # TODO
1556        // Unused. Add this metrics later.
1557        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1558            .iceberg_rolling_unflushed_data_file
1559            .with_guarded_label_values(&metrics_labels);
1560        let write_bytes = GLOBAL_SINK_METRICS
1561            .iceberg_write_bytes
1562            .with_guarded_label_values(&metrics_labels);
1563
1564        // Determine the schema id and partition spec id
1565        let schema = table.metadata().current_schema();
1566        let partition_spec = table.metadata().default_partition_spec();
1567        let fanout_enabled = !partition_spec.fields().is_empty();
1568        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1569
1570        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1571        let unique_uuid_suffix = Uuid::now_v7();
1572
1573        let parquet_writer_properties = WriterProperties::builder()
1574            .set_compression(config.get_parquet_compression())
1575            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1576            .set_created_by(PARQUET_CREATED_BY.to_owned())
1577            .build();
1578
1579        let data_file_builder = {
1580            let parquet_writer_builder =
1581                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1582            let rolling_writer_builder = RollingFileWriterBuilder::new(
1583                parquet_writer_builder,
1584                (config.target_file_size_mb() * 1024 * 1024) as usize,
1585                table.file_io().clone(),
1586                DefaultLocationGenerator::new(table.metadata().clone())
1587                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1588                DefaultFileNameGenerator::new(
1589                    writer_param.actor_id.to_string(),
1590                    Some(unique_uuid_suffix.to_string()),
1591                    iceberg::spec::DataFileFormat::Parquet,
1592                ),
1593            );
1594            DataFileWriterBuilder::new(rolling_writer_builder)
1595        };
1596        let position_delete_builder = if use_deletion_vectors {
1597            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1598                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1599            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1600                table.file_io().clone(),
1601                location_generator,
1602                DefaultFileNameGenerator::new(
1603                    writer_param.actor_id.to_string(),
1604                    Some(format!("delvec-{}", unique_uuid_suffix)),
1605                    iceberg::spec::DataFileFormat::Puffin,
1606                ),
1607            ))
1608        } else {
1609            let parquet_writer_builder = ParquetWriterBuilder::new(
1610                parquet_writer_properties.clone(),
1611                POSITION_DELETE_SCHEMA.clone().into(),
1612            );
1613            let rolling_writer_builder = RollingFileWriterBuilder::new(
1614                parquet_writer_builder,
1615                (config.target_file_size_mb() * 1024 * 1024) as usize,
1616                table.file_io().clone(),
1617                DefaultLocationGenerator::new(table.metadata().clone())
1618                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1619                DefaultFileNameGenerator::new(
1620                    writer_param.actor_id.to_string(),
1621                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1622                    iceberg::spec::DataFileFormat::Parquet,
1623                ),
1624            );
1625            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1626                rolling_writer_builder,
1627            ))
1628        };
1629        let equality_delete_builder = {
1630            let eq_del_config = EqualityDeleteWriterConfig::new(
1631                unique_column_ids.clone(),
1632                table.metadata().current_schema().clone(),
1633            )
1634            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1635            let parquet_writer_builder = ParquetWriterBuilder::new(
1636                parquet_writer_properties,
1637                Arc::new(
1638                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1639                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1640                ),
1641            );
1642            let rolling_writer_builder = RollingFileWriterBuilder::new(
1643                parquet_writer_builder,
1644                (config.target_file_size_mb() * 1024 * 1024) as usize,
1645                table.file_io().clone(),
1646                DefaultLocationGenerator::new(table.metadata().clone())
1647                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1648                DefaultFileNameGenerator::new(
1649                    writer_param.actor_id.to_string(),
1650                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1651                    iceberg::spec::DataFileFormat::Parquet,
1652                ),
1653            );
1654
1655            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1656        };
1657        let delta_builder = DeltaWriterBuilder::new(
1658            data_file_builder,
1659            position_delete_builder,
1660            equality_delete_builder,
1661            unique_column_ids,
1662            schema.clone(),
1663        );
1664        let original_arrow_schema = Arc::new(
1665            schema_to_arrow_schema(table.metadata().current_schema())
1666                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1667        );
1668        let schema_with_extra_op_column = {
1669            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1670            new_fields.push(Arc::new(ArrowField::new(
1671                "op".to_owned(),
1672                ArrowDataType::Int32,
1673                false,
1674            )));
1675            Arc::new(ArrowSchema::new(new_fields))
1676        };
1677        let writer_builder = TaskWriterBuilderWrapper::new(
1678            MonitoredGeneralWriterBuilder::new(
1679                delta_builder,
1680                write_qps.clone(),
1681                write_latency.clone(),
1682            ),
1683            fanout_enabled,
1684            schema.clone(),
1685            partition_spec.clone(),
1686            true,
1687        );
1688        let inner_writer = Some(Box::new(
1689            writer_builder
1690                .build()
1691                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1692        ) as Box<dyn IcebergWriter>);
1693        Ok(Self {
1694            arrow_schema: original_arrow_schema,
1695            metrics: IcebergWriterMetrics {
1696                _write_qps: write_qps,
1697                _write_latency: write_latency,
1698                write_bytes,
1699            },
1700            table,
1701            writer: IcebergWriterDispatch::Upsert {
1702                writer: inner_writer,
1703                writer_builder,
1704                arrow_schema_with_op_column: schema_with_extra_op_column,
1705            },
1706            project_idx_vec: {
1707                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1708                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1709                } else {
1710                    ProjectIdxVec::None
1711                }
1712            },
1713            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1714            uncommitted_write_bytes: 0,
1715        })
1716    }
1717}
1718
1719#[async_trait]
1720impl SinkWriter for IcebergSinkWriter {
1721    type CommitMetadata = Option<SinkMetadata>;
1722
1723    /// Begin a new epoch
1724    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1725        let Self::Created(args) = self else {
1726            return Ok(());
1727        };
1728
1729        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1730        let inner = match &args.unique_column_ids {
1731            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1732                &args.config,
1733                table,
1734                unique_column_ids.clone(),
1735                &args.writer_param,
1736            )?,
1737            None => {
1738                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1739            }
1740        };
1741
1742        *self = IcebergSinkWriter::Initialized(inner);
1743        Ok(())
1744    }
1745
1746    /// Write a stream chunk to sink
1747    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1748        let Self::Initialized(inner) = self else {
1749            unreachable!("IcebergSinkWriter should be initialized before barrier");
1750        };
1751
1752        // Try to build writer if it's None.
1753        match &mut inner.writer {
1754            IcebergWriterDispatch::Append {
1755                writer,
1756                writer_builder,
1757            } => {
1758                if writer.is_none() {
1759                    *writer = Some(Box::new(
1760                        writer_builder
1761                            .build()
1762                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1763                    ));
1764                }
1765            }
1766            IcebergWriterDispatch::Upsert {
1767                writer,
1768                writer_builder,
1769                ..
1770            } => {
1771                if writer.is_none() {
1772                    *writer = Some(Box::new(
1773                        writer_builder
1774                            .build()
1775                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1776                    ));
1777                }
1778            }
1779        };
1780
1781        // Process the chunk.
1782        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1783        match &mut inner.project_idx_vec {
1784            ProjectIdxVec::None => {}
1785            ProjectIdxVec::Prepare(idx) => {
1786                if *idx >= chunk.columns().len() {
1787                    return Err(SinkError::Iceberg(anyhow!(
1788                        "invalid extra partition column index {}",
1789                        idx
1790                    )));
1791                }
1792                let project_idx_vec = (0..*idx)
1793                    .chain(*idx + 1..chunk.columns().len())
1794                    .collect_vec();
1795                chunk = chunk.project(&project_idx_vec);
1796                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1797            }
1798            ProjectIdxVec::Done(idx_vec) => {
1799                chunk = chunk.project(idx_vec);
1800            }
1801        }
1802        if ops.is_empty() {
1803            return Ok(());
1804        }
1805        let write_batch_size = chunk.estimated_heap_size();
1806        let batch = match &inner.writer {
1807            IcebergWriterDispatch::Append { .. } => {
1808                // separate out insert chunk
1809                let filters =
1810                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1811                chunk.set_visibility(filters);
1812                IcebergArrowConvert
1813                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1814                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1815            }
1816            IcebergWriterDispatch::Upsert {
1817                arrow_schema_with_op_column,
1818                ..
1819            } => {
1820                let chunk = IcebergArrowConvert
1821                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1822                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1823                let ops = Arc::new(Int32Array::from(
1824                    ops.iter()
1825                        .map(|op| match op {
1826                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1827                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1828                        })
1829                        .collect_vec(),
1830                ));
1831                let mut columns = chunk.columns().to_vec();
1832                columns.push(ops);
1833                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1834                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1835            }
1836        };
1837
1838        let writer = inner.writer.get_writer().unwrap();
1839        writer
1840            .write(batch)
1841            .instrument_await("iceberg_write")
1842            .await
1843            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1844        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1845        inner.uncommitted_write_bytes = inner
1846            .uncommitted_write_bytes
1847            .saturating_add(write_batch_size as u64);
1848        Ok(())
1849    }
1850
1851    fn should_commit_on_checkpoint(&self) -> bool {
1852        match self {
1853            Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
1854            Self::Created(_) => false,
1855        }
1856    }
1857
1858    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1859    /// writer should commit the current epoch.
1860    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1861        let Self::Initialized(inner) = self else {
1862            unreachable!("IcebergSinkWriter should be initialized before barrier");
1863        };
1864
1865        // Skip it if not checkpoint
1866        if !is_checkpoint {
1867            return Ok(None);
1868        }
1869
1870        let close_result = match &mut inner.writer {
1871            IcebergWriterDispatch::Append {
1872                writer,
1873                writer_builder,
1874            } => {
1875                let close_result = match writer.take() {
1876                    Some(mut writer) => {
1877                        Some(writer.close().instrument_await("iceberg_close").await)
1878                    }
1879                    _ => None,
1880                };
1881                match writer_builder.build() {
1882                    Ok(new_writer) => {
1883                        *writer = Some(Box::new(new_writer));
1884                    }
1885                    _ => {
1886                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1887                        // here because current writer may close successfully. So we just log the error.
1888                        warn!("Failed to build new writer after close");
1889                    }
1890                }
1891                close_result
1892            }
1893            IcebergWriterDispatch::Upsert {
1894                writer,
1895                writer_builder,
1896                ..
1897            } => {
1898                let close_result = match writer.take() {
1899                    Some(mut writer) => {
1900                        Some(writer.close().instrument_await("iceberg_close").await)
1901                    }
1902                    _ => None,
1903                };
1904                match writer_builder.build() {
1905                    Ok(new_writer) => {
1906                        *writer = Some(Box::new(new_writer));
1907                    }
1908                    _ => {
1909                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1910                        // here because current writer may close successfully. So we just log the error.
1911                        warn!("Failed to build new writer after close");
1912                    }
1913                }
1914                close_result
1915            }
1916        };
1917
1918        match close_result {
1919            Some(Ok(result)) => {
1920                inner.uncommitted_write_bytes = 0;
1921                let format_version = inner.table.metadata().format_version();
1922                let partition_type = inner.table.metadata().default_partition_type();
1923                let data_files = result
1924                    .into_iter()
1925                    .map(|f| {
1926                        // Truncate large column statistics BEFORE serialization
1927                        let truncated = truncate_datafile(f);
1928                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1929                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1930                    })
1931                    .collect::<Result<Vec<_>>>()?;
1932                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1933                    data_files,
1934                    schema_id: inner.table.metadata().current_schema_id(),
1935                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1936                })?))
1937            }
1938            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1939            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1940        }
1941    }
1942}
1943
1944const SCHEMA_ID: &str = "schema_id";
1945const PARTITION_SPEC_ID: &str = "partition_spec_id";
1946const DATA_FILES: &str = "data_files";
1947
1948/// Maximum size for column statistics (min/max values) in bytes.
1949/// Column statistics larger than this will be truncated to avoid metadata bloat.
1950/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1951///
1952/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1953/// We truncate at the `DataFile` level (before serialization) by directly modifying
1954/// the public `lower_bounds` and `upper_bounds` fields.
1955///
1956/// This prevents metadata from ballooning to gigabytes when dealing with large
1957/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1958/// that benefit from query optimization.
1959const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1960
1961/// Truncate large column statistics from `DataFile` BEFORE serialization.
1962///
1963/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1964/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1965///
1966/// # Arguments
1967/// * `data_file` - A `DataFile` to process
1968///
1969/// # Returns
1970/// The modified `DataFile` with large statistics truncated
1971fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1972    // Process lower_bounds - remove entries with large values
1973    data_file.lower_bounds.retain(|field_id, datum| {
1974        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1975        let size = match datum.to_bytes() {
1976            Ok(bytes) => bytes.len(),
1977            Err(_) => 0,
1978        };
1979
1980        if size > MAX_COLUMN_STAT_SIZE {
1981            tracing::debug!(
1982                field_id = field_id,
1983                size = size,
1984                "Truncating large lower_bound statistic"
1985            );
1986            return false;
1987        }
1988        true
1989    });
1990
1991    // Process upper_bounds - remove entries with large values
1992    data_file.upper_bounds.retain(|field_id, datum| {
1993        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1994        let size = match datum.to_bytes() {
1995            Ok(bytes) => bytes.len(),
1996            Err(_) => 0,
1997        };
1998
1999        if size > MAX_COLUMN_STAT_SIZE {
2000            tracing::debug!(
2001                field_id = field_id,
2002                size = size,
2003                "Truncating large upper_bound statistic"
2004            );
2005            return false;
2006        }
2007        true
2008    });
2009
2010    data_file
2011}
2012
2013#[derive(Default, Clone)]
2014struct IcebergCommitResult {
2015    schema_id: i32,
2016    partition_spec_id: i32,
2017    data_files: Vec<SerializedDataFile>,
2018}
2019
2020impl IcebergCommitResult {
2021    fn try_from(value: &SinkMetadata) -> Result<Self> {
2022        if let Some(Serialized(v)) = &value.metadata {
2023            let mut values = if let serde_json::Value::Object(v) =
2024                serde_json::from_slice::<serde_json::Value>(&v.metadata)
2025                    .context("Can't parse iceberg sink metadata")?
2026            {
2027                v
2028            } else {
2029                bail!("iceberg sink metadata should be an object");
2030            };
2031
2032            let schema_id;
2033            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2034                schema_id = value
2035                    .as_u64()
2036                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2037            } else {
2038                bail!("iceberg sink metadata should have schema_id");
2039            }
2040
2041            let partition_spec_id;
2042            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2043                partition_spec_id = value
2044                    .as_u64()
2045                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2046            } else {
2047                bail!("iceberg sink metadata should have partition_spec_id");
2048            }
2049
2050            let data_files: Vec<SerializedDataFile>;
2051            if let serde_json::Value::Array(values) = values
2052                .remove(DATA_FILES)
2053                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2054            {
2055                data_files = values
2056                    .into_iter()
2057                    .map(from_value::<SerializedDataFile>)
2058                    .collect::<std::result::Result<_, _>>()
2059                    .unwrap();
2060            } else {
2061                bail!("iceberg sink metadata should have data_files object");
2062            }
2063
2064            Ok(Self {
2065                schema_id: schema_id as i32,
2066                partition_spec_id: partition_spec_id as i32,
2067                data_files,
2068            })
2069        } else {
2070            bail!("Can't create iceberg sink write result from empty data!")
2071        }
2072    }
2073
2074    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2075        let mut values = if let serde_json::Value::Object(value) =
2076            serde_json::from_slice::<serde_json::Value>(&value)
2077                .context("Can't parse iceberg sink metadata")?
2078        {
2079            value
2080        } else {
2081            bail!("iceberg sink metadata should be an object");
2082        };
2083
2084        let schema_id;
2085        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2086            schema_id = value
2087                .as_u64()
2088                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2089        } else {
2090            bail!("iceberg sink metadata should have schema_id");
2091        }
2092
2093        let partition_spec_id;
2094        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2095            partition_spec_id = value
2096                .as_u64()
2097                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2098        } else {
2099            bail!("iceberg sink metadata should have partition_spec_id");
2100        }
2101
2102        let data_files: Vec<SerializedDataFile>;
2103        if let serde_json::Value::Array(values) = values
2104            .remove(DATA_FILES)
2105            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2106        {
2107            data_files = values
2108                .into_iter()
2109                .map(from_value::<SerializedDataFile>)
2110                .collect::<std::result::Result<_, _>>()
2111                .unwrap();
2112        } else {
2113            bail!("iceberg sink metadata should have data_files object");
2114        }
2115
2116        Ok(Self {
2117            schema_id: schema_id as i32,
2118            partition_spec_id: partition_spec_id as i32,
2119            data_files,
2120        })
2121    }
2122}
2123
2124impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2125    type Error = SinkError;
2126
2127    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2128        let json_data_files = serde_json::Value::Array(
2129            value
2130                .data_files
2131                .iter()
2132                .map(serde_json::to_value)
2133                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2134                .context("Can't serialize data files to json")?,
2135        );
2136        let json_value = serde_json::Value::Object(
2137            vec![
2138                (
2139                    SCHEMA_ID.to_owned(),
2140                    serde_json::Value::Number(value.schema_id.into()),
2141                ),
2142                (
2143                    PARTITION_SPEC_ID.to_owned(),
2144                    serde_json::Value::Number(value.partition_spec_id.into()),
2145                ),
2146                (DATA_FILES.to_owned(), json_data_files),
2147            ]
2148            .into_iter()
2149            .collect(),
2150        );
2151        Ok(SinkMetadata {
2152            metadata: Some(Serialized(SerializedMetadata {
2153                metadata: serde_json::to_vec(&json_value)
2154                    .context("Can't serialize iceberg sink metadata")?,
2155            })),
2156        })
2157    }
2158}
2159
2160impl TryFrom<IcebergCommitResult> for Vec<u8> {
2161    type Error = SinkError;
2162
2163    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2164        let json_data_files = serde_json::Value::Array(
2165            value
2166                .data_files
2167                .iter()
2168                .map(serde_json::to_value)
2169                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2170                .context("Can't serialize data files to json")?,
2171        );
2172        let json_value = serde_json::Value::Object(
2173            vec![
2174                (
2175                    SCHEMA_ID.to_owned(),
2176                    serde_json::Value::Number(value.schema_id.into()),
2177                ),
2178                (
2179                    PARTITION_SPEC_ID.to_owned(),
2180                    serde_json::Value::Number(value.partition_spec_id.into()),
2181                ),
2182                (DATA_FILES.to_owned(), json_data_files),
2183            ]
2184            .into_iter()
2185            .collect(),
2186        );
2187        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2188    }
2189}
2190pub struct IcebergSinkCommitter {
2191    catalog: Arc<dyn Catalog>,
2192    table: Table,
2193    pub last_commit_epoch: u64,
2194    pub(crate) sink_id: SinkId,
2195    pub(crate) config: IcebergConfig,
2196    pub(crate) param: SinkParam,
2197    commit_retry_num: u32,
2198    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2199}
2200
2201impl IcebergSinkCommitter {
2202    // Reload table and guarantee current schema_id and partition_spec_id matches
2203    // given `schema_id` and `partition_spec_id`
2204    async fn reload_table(
2205        catalog: &dyn Catalog,
2206        table_ident: &TableIdent,
2207        schema_id: i32,
2208        partition_spec_id: i32,
2209    ) -> Result<Table> {
2210        let table = catalog
2211            .load_table(table_ident)
2212            .await
2213            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2214        if table.metadata().current_schema_id() != schema_id {
2215            return Err(SinkError::Iceberg(anyhow!(
2216                "Schema evolution not supported, expect schema id {}, but got {}",
2217                schema_id,
2218                table.metadata().current_schema_id()
2219            )));
2220        }
2221        if table.metadata().default_partition_spec_id() != partition_spec_id {
2222            return Err(SinkError::Iceberg(anyhow!(
2223                "Partition evolution not supported, expect partition spec id {}, but got {}",
2224                partition_spec_id,
2225                table.metadata().default_partition_spec_id()
2226            )));
2227        }
2228        Ok(table)
2229    }
2230}
2231
2232#[async_trait]
2233impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2234    async fn init(&mut self) -> Result<()> {
2235        tracing::info!(
2236            sink_id = %self.param.sink_id,
2237            "Iceberg sink coordinator initialized",
2238        );
2239
2240        Ok(())
2241    }
2242
2243    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2244        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2245
2246        if metadata.is_empty() {
2247            tracing::debug!(?epoch, "No datafile to commit");
2248            return Ok(());
2249        }
2250
2251        // Commit data if present
2252        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2253            self.commit_data_impl(epoch, write_results, snapshot_id)
2254                .await?;
2255        }
2256
2257        Ok(())
2258    }
2259
2260    async fn commit_schema_change(
2261        &mut self,
2262        epoch: u64,
2263        schema_change: PbSinkSchemaChange,
2264    ) -> Result<()> {
2265        tracing::info!(
2266            "Committing schema change {:?} in epoch {}",
2267            schema_change,
2268            epoch
2269        );
2270        self.commit_schema_change_impl(schema_change).await?;
2271        tracing::info!("Successfully committed schema change in epoch {}", epoch);
2272
2273        Ok(())
2274    }
2275}
2276
2277#[async_trait]
2278impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2279    async fn init(&mut self) -> Result<()> {
2280        tracing::info!(
2281            sink_id = %self.param.sink_id,
2282            "Iceberg sink coordinator initialized",
2283        );
2284
2285        Ok(())
2286    }
2287
2288    async fn pre_commit(
2289        &mut self,
2290        epoch: u64,
2291        metadata: Vec<SinkMetadata>,
2292        _schema_change: Option<PbSinkSchemaChange>,
2293    ) -> Result<Option<Vec<u8>>> {
2294        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2295
2296        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2297            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2298            None => {
2299                tracing::debug!(?epoch, "no data to pre commit");
2300                return Ok(None);
2301            }
2302        };
2303
2304        let mut write_results_bytes = Vec::new();
2305        for each_parallelism_write_result in write_results {
2306            let each_parallelism_write_result_bytes: Vec<u8> =
2307                each_parallelism_write_result.try_into()?;
2308            write_results_bytes.push(each_parallelism_write_result_bytes);
2309        }
2310
2311        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2312        write_results_bytes.push(snapshot_id_bytes);
2313
2314        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2315        Ok(Some(pre_commit_metadata_bytes))
2316    }
2317
2318    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2319        tracing::debug!("Starting iceberg commit in epoch {epoch}");
2320
2321        if commit_metadata.is_empty() {
2322            tracing::debug!(?epoch, "No datafile to commit");
2323            return Ok(());
2324        }
2325
2326        // Deserialize commit metadata
2327        let mut payload = deserialize_metadata(commit_metadata);
2328        if payload.is_empty() {
2329            return Err(SinkError::Iceberg(anyhow!(
2330                "Invalid commit metadata: empty payload"
2331            )));
2332        }
2333
2334        // Last element is snapshot_id
2335        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2336            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2337        })?;
2338        let snapshot_id = i64::from_le_bytes(
2339            snapshot_id_bytes
2340                .try_into()
2341                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2342        );
2343
2344        // Remaining elements are write_results
2345        let write_results = payload
2346            .into_iter()
2347            .map(IcebergCommitResult::try_from_serialized_bytes)
2348            .collect::<Result<Vec<_>>>()?;
2349
2350        let snapshot_committed = self
2351            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2352            .await?;
2353
2354        if snapshot_committed {
2355            tracing::info!(
2356                "Snapshot id {} already committed in iceberg table, skip committing again.",
2357                snapshot_id
2358            );
2359            return Ok(());
2360        }
2361
2362        self.commit_data_impl(epoch, write_results, snapshot_id)
2363            .await
2364    }
2365
2366    async fn commit_schema_change(
2367        &mut self,
2368        epoch: u64,
2369        schema_change: PbSinkSchemaChange,
2370    ) -> Result<()> {
2371        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2372        if schema_updated {
2373            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2374            return Ok(());
2375        }
2376
2377        tracing::info!(
2378            "Committing schema change {:?} in epoch {}",
2379            schema_change,
2380            epoch
2381        );
2382        self.commit_schema_change_impl(schema_change).await?;
2383        tracing::info!("Successfully committed schema change in epoch {epoch}");
2384
2385        Ok(())
2386    }
2387
2388    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2389        // TODO: Files that have been written but not committed should be deleted.
2390        tracing::debug!("Abort not implemented yet");
2391    }
2392}
2393
2394/// Methods Required to Achieve Exactly Once Semantics
2395impl IcebergSinkCommitter {
2396    fn pre_commit_inner(
2397        &mut self,
2398        _epoch: u64,
2399        metadata: Vec<SinkMetadata>,
2400    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2401        let write_results: Vec<IcebergCommitResult> = metadata
2402            .iter()
2403            .map(IcebergCommitResult::try_from)
2404            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2405
2406        // Skip if no data to commit
2407        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2408            return Ok(None);
2409        }
2410
2411        let expect_schema_id = write_results[0].schema_id;
2412        let expect_partition_spec_id = write_results[0].partition_spec_id;
2413
2414        // guarantee that all write results has same schema_id and partition_spec_id
2415        if write_results
2416            .iter()
2417            .any(|r| r.schema_id != expect_schema_id)
2418            || write_results
2419                .iter()
2420                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2421        {
2422            return Err(SinkError::Iceberg(anyhow!(
2423                "schema_id and partition_spec_id should be the same in all write results"
2424            )));
2425        }
2426
2427        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2428
2429        Ok(Some((write_results, snapshot_id)))
2430    }
2431
2432    async fn commit_data_impl(
2433        &mut self,
2434        epoch: u64,
2435        write_results: Vec<IcebergCommitResult>,
2436        snapshot_id: i64,
2437    ) -> Result<()> {
2438        // Empty write results should be handled before calling this function.
2439        assert!(
2440            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2441        );
2442
2443        // Check snapshot limit before proceeding with commit
2444        self.wait_for_snapshot_limit().await?;
2445
2446        let expect_schema_id = write_results[0].schema_id;
2447        let expect_partition_spec_id = write_results[0].partition_spec_id;
2448
2449        // Load the latest table to avoid concurrent modification with the best effort.
2450        self.table = Self::reload_table(
2451            self.catalog.as_ref(),
2452            self.table.identifier(),
2453            expect_schema_id,
2454            expect_partition_spec_id,
2455        )
2456        .await?;
2457
2458        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2459            return Err(SinkError::Iceberg(anyhow!(
2460                "Can't find schema by id {}",
2461                expect_schema_id
2462            )));
2463        };
2464        let Some(partition_spec) = self
2465            .table
2466            .metadata()
2467            .partition_spec_by_id(expect_partition_spec_id)
2468        else {
2469            return Err(SinkError::Iceberg(anyhow!(
2470                "Can't find partition spec by id {}",
2471                expect_partition_spec_id
2472            )));
2473        };
2474        let partition_type = partition_spec
2475            .as_ref()
2476            .clone()
2477            .partition_type(schema)
2478            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2479
2480        let data_files = write_results
2481            .into_iter()
2482            .flat_map(|r| {
2483                r.data_files.into_iter().map(|f| {
2484                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2485                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2486                })
2487            })
2488            .collect::<Result<Vec<DataFile>>>()?;
2489        // # TODO:
2490        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2491        // because retry logic involved reapply the commit metadata.
2492        // For now, we just retry the commit operation.
2493        let retry_strategy = ExponentialBackoff::from_millis(10)
2494            .max_delay(Duration::from_secs(60))
2495            .map(jitter)
2496            .take(self.commit_retry_num as usize);
2497        let catalog = self.catalog.clone();
2498        let table_ident = self.table.identifier().clone();
2499
2500        // Custom retry logic that:
2501        // 1. Calls reload_table before each commit attempt to get the latest metadata
2502        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2503        // 3. If commit fails, retries with backoff
2504        enum CommitError {
2505            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2506            Commit(SinkError),      // Retriable: commit conflicts, network errors
2507        }
2508
2509        let table = RetryIf::spawn(
2510            retry_strategy,
2511            || async {
2512                // Reload table before each commit attempt to get the latest metadata
2513                let table = Self::reload_table(
2514                    catalog.as_ref(),
2515                    &table_ident,
2516                    expect_schema_id,
2517                    expect_partition_spec_id,
2518                )
2519                .await
2520                .map_err(|e| {
2521                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2522                    CommitError::ReloadTable(e)
2523                })?;
2524
2525                let txn = Transaction::new(&table);
2526                let append_action = txn
2527                    .fast_append()
2528                    .set_snapshot_id(snapshot_id)
2529                    .set_target_branch(commit_branch(
2530                        self.config.r#type.as_str(),
2531                        self.config.write_mode,
2532                    ))
2533                    .add_data_files(data_files.clone());
2534
2535                let tx = append_action.apply(txn).map_err(|err| {
2536                    let err: IcebergError = err.into();
2537                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2538                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2539                })?;
2540
2541                tx.commit(catalog.as_ref()).await.map_err(|err| {
2542                    let err: IcebergError = err.into();
2543                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2544                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2545                })
2546            },
2547            |err: &CommitError| {
2548                // Only retry on commit errors, not on reload_table errors
2549                match err {
2550                    CommitError::Commit(_) => {
2551                        tracing::warn!("Commit failed, will retry");
2552                        true
2553                    }
2554                    CommitError::ReloadTable(_) => {
2555                        tracing::error!(
2556                            "reload_table failed with non-retriable error, will not retry"
2557                        );
2558                        false
2559                    }
2560                }
2561            },
2562        )
2563        .await
2564        .map_err(|e| match e {
2565            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2566        })?;
2567        self.table = table;
2568
2569        let snapshot_num = self.table.metadata().snapshots().count();
2570        let catalog_name = self.config.common.catalog_name();
2571        let table_name = self.table.identifier().to_string();
2572        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2573        GLOBAL_SINK_METRICS
2574            .iceberg_snapshot_num
2575            .with_guarded_label_values(&metrics_labels)
2576            .set(snapshot_num as i64);
2577
2578        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2579
2580        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2581            && self.config.enable_compaction
2582            && iceberg_compact_stat_sender
2583                .send(IcebergSinkCompactionUpdate {
2584                    sink_id: self.sink_id,
2585                    compaction_interval: self.config.compaction_interval_sec(),
2586                    force_compaction: false,
2587                })
2588                .is_err()
2589        {
2590            warn!("failed to send iceberg compaction stats");
2591        }
2592
2593        Ok(())
2594    }
2595
2596    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2597    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2598    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2599    async fn is_snapshot_id_in_iceberg(
2600        &self,
2601        iceberg_config: &IcebergConfig,
2602        snapshot_id: i64,
2603    ) -> Result<bool> {
2604        let table = iceberg_config.load_table().await?;
2605        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2606            Ok(true)
2607        } else {
2608            Ok(false)
2609        }
2610    }
2611
2612    /// Check if the specified columns already exist in the iceberg table's current schema.
2613    /// This is used to determine if schema change has already been applied.
2614    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2615        let current_schema = self.table.metadata().current_schema();
2616        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2617            .context("Failed to convert schema")
2618            .map_err(SinkError::Iceberg)?;
2619
2620        let iceberg_arrow_convert = IcebergArrowConvert;
2621
2622        let schema_matches = |expected: &[ArrowField]| {
2623            if current_arrow_schema.fields().len() != expected.len() {
2624                return false;
2625            }
2626
2627            expected.iter().all(|expected_field| {
2628                current_arrow_schema.fields().iter().any(|current_field| {
2629                    current_field.name() == expected_field.name()
2630                        && current_field.data_type() == expected_field.data_type()
2631                })
2632            })
2633        };
2634
2635        let original_arrow_fields: Vec<ArrowField> = schema_change
2636            .original_schema
2637            .iter()
2638            .map(|pb_field| {
2639                let field = Field::from(pb_field);
2640                iceberg_arrow_convert
2641                    .to_arrow_field(&field.name, &field.data_type)
2642                    .context("Failed to convert field to arrow")
2643                    .map_err(SinkError::Iceberg)
2644            })
2645            .collect::<Result<_>>()?;
2646
2647        // If current schema equals original_schema, then schema change is NOT applied.
2648        if schema_matches(&original_arrow_fields) {
2649            tracing::debug!(
2650                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2651                original_arrow_fields.len()
2652            );
2653            return Ok(false);
2654        }
2655
2656        // We only support add_columns for now.
2657        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2658            schema_change.op.as_ref()
2659        else {
2660            return Err(SinkError::Iceberg(anyhow!(
2661                "Unsupported sink schema change op in iceberg sink: {:?}",
2662                schema_change.op
2663            )));
2664        };
2665
2666        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2667            .fields
2668            .iter()
2669            .map(|pb_field| {
2670                let field = Field::from(pb_field);
2671                iceberg_arrow_convert
2672                    .to_arrow_field(&field.name, &field.data_type)
2673                    .context("Failed to convert field to arrow")
2674                    .map_err(SinkError::Iceberg)
2675            })
2676            .collect::<Result<_>>()?;
2677
2678        let mut expected_after_change = original_arrow_fields;
2679        expected_after_change.extend(add_arrow_fields);
2680
2681        // If current schema equals original_schema + add_columns, then schema change is applied.
2682        if schema_matches(&expected_after_change) {
2683            tracing::debug!(
2684                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2685                expected_after_change.len()
2686            );
2687            return Ok(true);
2688        }
2689
2690        Err(SinkError::Iceberg(anyhow!(
2691            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2692            schema_change.original_schema.len()
2693        )))
2694    }
2695
2696    /// Commit schema changes (e.g., add columns) to the iceberg table.
2697    /// This function uses Transaction API to atomically update the table schema
2698    /// with optimistic locking to prevent concurrent conflicts.
2699    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2700        use iceberg::spec::NestedField;
2701
2702        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2703            schema_change.op.as_ref()
2704        else {
2705            return Err(SinkError::Iceberg(anyhow!(
2706                "Unsupported sink schema change op in iceberg sink: {:?}",
2707                schema_change.op
2708            )));
2709        };
2710
2711        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2712
2713        // Step 1: Get current table metadata
2714        let metadata = self.table.metadata();
2715        let mut next_field_id = metadata.last_column_id() + 1;
2716        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2717
2718        // Step 2: Build new fields to add
2719        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2720        let mut new_fields = Vec::new();
2721
2722        for field in &add_columns {
2723            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2724            let arrow_field = iceberg_create_table_arrow_convert
2725                .to_arrow_field(&field.name, &field.data_type)
2726                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2727                .map_err(SinkError::Iceberg)?;
2728
2729            // Convert Arrow DataType to Iceberg Type
2730            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2731                .map_err(|err| {
2732                    SinkError::Iceberg(
2733                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2734                    )
2735                })?;
2736
2737            // Create NestedField with the next available field ID
2738            let nested_field = Arc::new(NestedField::optional(
2739                next_field_id,
2740                &field.name,
2741                iceberg_type,
2742            ));
2743
2744            new_fields.push(nested_field);
2745            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2746            next_field_id += 1;
2747        }
2748
2749        // Step 3: Create Transaction with UpdateSchemaAction
2750        tracing::info!(
2751            "Committing schema change to catalog for table {}",
2752            self.table.identifier()
2753        );
2754
2755        let txn = Transaction::new(&self.table);
2756        let action = txn.update_schema().add_fields(new_fields);
2757
2758        let updated_table = action
2759            .apply(txn)
2760            .context("Failed to apply schema update action")
2761            .map_err(SinkError::Iceberg)?
2762            .commit(self.catalog.as_ref())
2763            .await
2764            .context("Failed to commit table schema change")
2765            .map_err(SinkError::Iceberg)?;
2766
2767        self.table = updated_table;
2768
2769        tracing::info!(
2770            "Successfully committed schema change, added {} columns to iceberg table",
2771            add_columns.len()
2772        );
2773
2774        Ok(())
2775    }
2776
2777    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2778    /// Returns the number of snapshots since the last rewrite/overwrite
2779    fn count_snapshots_since_rewrite(&self) -> usize {
2780        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2781        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2782
2783        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2784        let mut count = 0;
2785        for snapshot in snapshots {
2786            // Check if this snapshot represents a rewrite or overwrite operation
2787            let summary = snapshot.summary();
2788            match &summary.operation {
2789                Operation::Replace => {
2790                    // Found a rewrite/overwrite operation, stop counting
2791                    break;
2792                }
2793
2794                _ => {
2795                    // Increment count for each snapshot that is not a rewrite/overwrite
2796                    count += 1;
2797                }
2798            }
2799        }
2800
2801        count
2802    }
2803
2804    /// Wait until snapshot count since last rewrite is below the limit
2805    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2806        if !self.config.enable_compaction {
2807            return Ok(());
2808        }
2809
2810        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2811            loop {
2812                let current_count = self.count_snapshots_since_rewrite();
2813
2814                if current_count < max_snapshots {
2815                    tracing::info!(
2816                        "Snapshot count check passed: {} < {}",
2817                        current_count,
2818                        max_snapshots
2819                    );
2820                    break;
2821                }
2822
2823                tracing::info!(
2824                    "Snapshot count {} exceeds limit {}, waiting...",
2825                    current_count,
2826                    max_snapshots
2827                );
2828
2829                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2830                    && iceberg_compact_stat_sender
2831                        .send(IcebergSinkCompactionUpdate {
2832                            sink_id: self.sink_id,
2833                            compaction_interval: self.config.compaction_interval_sec(),
2834                            force_compaction: true,
2835                        })
2836                        .is_err()
2837                {
2838                    tracing::warn!("failed to send iceberg compaction stats");
2839                }
2840
2841                // Wait for 30 seconds before checking again
2842                tokio::time::sleep(Duration::from_secs(30)).await;
2843
2844                // Reload table to get latest snapshots
2845                self.table = self.config.load_table().await?;
2846            }
2847        }
2848        Ok(())
2849    }
2850}
2851
2852const MAP_KEY: &str = "key";
2853const MAP_VALUE: &str = "value";
2854
2855fn get_fields<'a>(
2856    our_field_type: &'a risingwave_common::types::DataType,
2857    data_type: &ArrowDataType,
2858    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2859) -> Option<ArrowFields> {
2860    match data_type {
2861        ArrowDataType::Struct(fields) => {
2862            match our_field_type {
2863                risingwave_common::types::DataType::Struct(struct_fields) => {
2864                    struct_fields.iter().for_each(|(name, data_type)| {
2865                        let res = schema_fields.insert(name, data_type);
2866                        // This assert is to make sure there is no duplicate field name in the schema.
2867                        assert!(res.is_none())
2868                    });
2869                }
2870                risingwave_common::types::DataType::Map(map_fields) => {
2871                    schema_fields.insert(MAP_KEY, map_fields.key());
2872                    schema_fields.insert(MAP_VALUE, map_fields.value());
2873                }
2874                risingwave_common::types::DataType::List(list) => {
2875                    list.elem()
2876                        .as_struct()
2877                        .iter()
2878                        .for_each(|(name, data_type)| {
2879                            let res = schema_fields.insert(name, data_type);
2880                            // This assert is to make sure there is no duplicate field name in the schema.
2881                            assert!(res.is_none())
2882                        });
2883                }
2884                _ => {}
2885            };
2886            Some(fields.clone())
2887        }
2888        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2889            get_fields(our_field_type, field.data_type(), schema_fields)
2890        }
2891        _ => None, // not a supported complex type and unlikely to show up
2892    }
2893}
2894
2895fn check_compatibility(
2896    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2897    fields: &ArrowFields,
2898) -> anyhow::Result<bool> {
2899    for arrow_field in fields {
2900        let our_field_type = schema_fields
2901            .get(arrow_field.name().as_str())
2902            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2903
2904        // Iceberg source should be able to read iceberg decimal type.
2905        let converted_arrow_data_type = IcebergArrowConvert
2906            .to_arrow_field("", our_field_type)
2907            .map_err(|e| anyhow!(e))?
2908            .data_type()
2909            .clone();
2910
2911        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2912            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2913            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2914            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2915            (ArrowDataType::List(_), ArrowDataType::List(field))
2916            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2917                let mut schema_fields = HashMap::new();
2918                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2919                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2920            }
2921            // validate nested structs
2922            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2923                let mut schema_fields = HashMap::new();
2924                our_field_type
2925                    .as_struct()
2926                    .iter()
2927                    .for_each(|(name, data_type)| {
2928                        let res = schema_fields.insert(name, data_type);
2929                        // This assert is to make sure there is no duplicate field name in the schema.
2930                        assert!(res.is_none())
2931                    });
2932                check_compatibility(schema_fields, fields)?
2933            }
2934            // cases where left != right (metadata, field name mismatch)
2935            //
2936            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2937            // {"PARQUET:field_id": ".."}
2938            //
2939            // map: The standard name in arrow is "entries", "key", "value".
2940            // in iceberg-rs, it's called "key_value"
2941            (left, right) => left.equals_datatype(right),
2942        };
2943        if !compatible {
2944            bail!(
2945                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2946                arrow_field.name(),
2947                converted_arrow_data_type,
2948                arrow_field.data_type()
2949            );
2950        }
2951    }
2952    Ok(true)
2953}
2954
2955/// Try to match our schema with iceberg schema.
2956pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2957    if rw_schema.fields.len() != arrow_schema.fields().len() {
2958        bail!(
2959            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2960            rw_schema.fields.len(),
2961            arrow_schema.fields.len()
2962        );
2963    }
2964
2965    let mut schema_fields = HashMap::new();
2966    rw_schema.fields.iter().for_each(|field| {
2967        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2968        // This assert is to make sure there is no duplicate field name in the schema.
2969        assert!(res.is_none())
2970    });
2971
2972    check_compatibility(schema_fields, &arrow_schema.fields)?;
2973    Ok(())
2974}
2975
2976fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2977    serde_json::to_vec(&metadata).unwrap()
2978}
2979
2980fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2981    serde_json::from_slice(&bytes).unwrap()
2982}
2983
2984pub fn parse_partition_by_exprs(
2985    expr: String,
2986) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2987    // captures column, transform(column), transform(n,column), transform(n, column)
2988    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2989    if !re.is_match(&expr) {
2990        bail!(format!(
2991            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2992            expr
2993        ))
2994    }
2995    let caps = re.captures_iter(&expr);
2996
2997    let mut partition_columns = vec![];
2998
2999    for mat in caps {
3000        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
3001            (&mat["transform"], Transform::Identity)
3002        } else {
3003            let mut func = mat["transform"].to_owned();
3004            if func == "bucket" || func == "truncate" {
3005                let n = &mat
3006                    .name("n")
3007                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
3008                    .as_str();
3009                func = format!("{func}[{n}]");
3010            }
3011            (
3012                &mat["field"],
3013                Transform::from_str(&func)
3014                    .with_context(|| format!("invalid transform function {}", func))?,
3015            )
3016        };
3017        partition_columns.push((column.to_owned(), transform));
3018    }
3019    Ok(partition_columns)
3020}
3021
3022pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
3023    if should_enable_iceberg_cow(sink_type, write_mode) {
3024        ICEBERG_COW_BRANCH.to_owned()
3025    } else {
3026        MAIN_BRANCH.to_owned()
3027    }
3028}
3029
3030pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
3031    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
3032}
3033
3034impl crate::with_options::WithOptions for IcebergWriteMode {}
3035
3036impl crate::with_options::WithOptions for FormatVersion {}
3037
3038impl crate::with_options::WithOptions for CompactionType {}
3039
3040#[cfg(test)]
3041mod test {
3042    use std::collections::BTreeMap;
3043
3044    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
3045    use risingwave_common::types::{DataType, MapType, StructType};
3046
3047    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
3048    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
3049    use crate::sink::iceberg::{
3050        COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
3051        CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FormatVersion,
3052        ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, IcebergConfig, IcebergWriteMode,
3053        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3054        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3055    };
3056
3057    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
3058
3059    #[test]
3060    fn test_compatible_arrow_schema() {
3061        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3062
3063        use super::*;
3064        let risingwave_schema = Schema::new(vec![
3065            Field::with_name(DataType::Int32, "a"),
3066            Field::with_name(DataType::Int32, "b"),
3067            Field::with_name(DataType::Int32, "c"),
3068        ]);
3069        let arrow_schema = ArrowSchema::new(vec![
3070            ArrowField::new("a", ArrowDataType::Int32, false),
3071            ArrowField::new("b", ArrowDataType::Int32, false),
3072            ArrowField::new("c", ArrowDataType::Int32, false),
3073        ]);
3074
3075        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3076
3077        let risingwave_schema = Schema::new(vec![
3078            Field::with_name(DataType::Int32, "d"),
3079            Field::with_name(DataType::Int32, "c"),
3080            Field::with_name(DataType::Int32, "a"),
3081            Field::with_name(DataType::Int32, "b"),
3082        ]);
3083        let arrow_schema = ArrowSchema::new(vec![
3084            ArrowField::new("a", ArrowDataType::Int32, false),
3085            ArrowField::new("b", ArrowDataType::Int32, false),
3086            ArrowField::new("d", ArrowDataType::Int32, false),
3087            ArrowField::new("c", ArrowDataType::Int32, false),
3088        ]);
3089        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3090
3091        let risingwave_schema = Schema::new(vec![
3092            Field::with_name(
3093                DataType::Struct(StructType::new(vec![
3094                    ("a1", DataType::Int32),
3095                    (
3096                        "a2",
3097                        DataType::Struct(StructType::new(vec![
3098                            ("a21", DataType::Bytea),
3099                            (
3100                                "a22",
3101                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3102                            ),
3103                        ])),
3104                    ),
3105                ])),
3106                "a",
3107            ),
3108            Field::with_name(
3109                DataType::list(DataType::Struct(StructType::new(vec![
3110                    ("b1", DataType::Int32),
3111                    ("b2", DataType::Bytea),
3112                    (
3113                        "b3",
3114                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3115                    ),
3116                ]))),
3117                "b",
3118            ),
3119            Field::with_name(
3120                DataType::Map(MapType::from_kv(
3121                    DataType::Varchar,
3122                    DataType::list(DataType::Struct(StructType::new([
3123                        ("c1", DataType::Int32),
3124                        ("c2", DataType::Bytea),
3125                        (
3126                            "c3",
3127                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3128                        ),
3129                    ]))),
3130                )),
3131                "c",
3132            ),
3133        ]);
3134        let arrow_schema = ArrowSchema::new(vec![
3135            ArrowField::new(
3136                "a",
3137                ArrowDataType::Struct(ArrowFields::from(vec![
3138                    ArrowField::new("a1", ArrowDataType::Int32, false),
3139                    ArrowField::new(
3140                        "a2",
3141                        ArrowDataType::Struct(ArrowFields::from(vec![
3142                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3143                            ArrowField::new_map(
3144                                "a22",
3145                                "entries",
3146                                ArrowFieldRef::new(ArrowField::new(
3147                                    "key",
3148                                    ArrowDataType::Utf8,
3149                                    false,
3150                                )),
3151                                ArrowFieldRef::new(ArrowField::new(
3152                                    "value",
3153                                    ArrowDataType::Utf8,
3154                                    false,
3155                                )),
3156                                false,
3157                                false,
3158                            ),
3159                        ])),
3160                        false,
3161                    ),
3162                ])),
3163                false,
3164            ),
3165            ArrowField::new(
3166                "b",
3167                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3168                    ArrowDataType::Struct(ArrowFields::from(vec![
3169                        ArrowField::new("b1", ArrowDataType::Int32, false),
3170                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3171                        ArrowField::new_map(
3172                            "b3",
3173                            "entries",
3174                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3175                            ArrowFieldRef::new(ArrowField::new(
3176                                "value",
3177                                ArrowDataType::Utf8,
3178                                false,
3179                            )),
3180                            false,
3181                            false,
3182                        ),
3183                    ])),
3184                    false,
3185                ))),
3186                false,
3187            ),
3188            ArrowField::new_map(
3189                "c",
3190                "entries",
3191                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3192                ArrowFieldRef::new(ArrowField::new(
3193                    "value",
3194                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3195                        ArrowDataType::Struct(ArrowFields::from(vec![
3196                            ArrowField::new("c1", ArrowDataType::Int32, false),
3197                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3198                            ArrowField::new_map(
3199                                "c3",
3200                                "entries",
3201                                ArrowFieldRef::new(ArrowField::new(
3202                                    "key",
3203                                    ArrowDataType::Utf8,
3204                                    false,
3205                                )),
3206                                ArrowFieldRef::new(ArrowField::new(
3207                                    "value",
3208                                    ArrowDataType::Utf8,
3209                                    false,
3210                                )),
3211                                false,
3212                                false,
3213                            ),
3214                        ])),
3215                        false,
3216                    ))),
3217                    false,
3218                )),
3219                false,
3220                false,
3221            ),
3222        ]);
3223        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3224    }
3225
3226    #[test]
3227    fn test_parse_iceberg_config() {
3228        let values = [
3229            ("connector", "iceberg"),
3230            ("type", "upsert"),
3231            ("primary_key", "v1"),
3232            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3233            ("warehouse.path", "s3://iceberg"),
3234            ("s3.endpoint", "http://127.0.0.1:9301"),
3235            ("s3.access.key", "hummockadmin"),
3236            ("s3.secret.key", "hummockadmin"),
3237            ("s3.path.style.access", "true"),
3238            ("s3.region", "us-east-1"),
3239            ("catalog.type", "jdbc"),
3240            ("catalog.name", "demo"),
3241            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3242            ("catalog.jdbc.user", "admin"),
3243            ("catalog.jdbc.password", "123456"),
3244            ("database.name", "demo_db"),
3245            ("table.name", "demo_table"),
3246            ("enable_compaction", "true"),
3247            ("compaction_interval_sec", "1800"),
3248            ("enable_snapshot_expiration", "true"),
3249        ]
3250        .into_iter()
3251        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3252        .collect();
3253
3254        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3255
3256        let expected_iceberg_config = IcebergConfig {
3257            common: IcebergCommon {
3258                warehouse_path: Some("s3://iceberg".to_owned()),
3259                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3260                s3_region: Some("us-east-1".to_owned()),
3261                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3262                s3_access_key: Some("hummockadmin".to_owned()),
3263                s3_secret_key: Some("hummockadmin".to_owned()),
3264                s3_iam_role_arn: None,
3265                gcs_credential: None,
3266                catalog_type: Some("jdbc".to_owned()),
3267                glue_id: None,
3268                glue_region: None,
3269                glue_access_key: None,
3270                glue_secret_key: None,
3271                glue_iam_role_arn: None,
3272                catalog_name: Some("demo".to_owned()),
3273                s3_path_style_access: Some(true),
3274                catalog_credential: None,
3275                catalog_oauth2_server_uri: None,
3276                catalog_scope: None,
3277                catalog_token: None,
3278                enable_config_load: None,
3279                rest_signing_name: None,
3280                rest_signing_region: None,
3281                rest_sigv4_enabled: None,
3282                hosted_catalog: None,
3283                azblob_account_name: None,
3284                azblob_account_key: None,
3285                azblob_endpoint_url: None,
3286                catalog_header: None,
3287                adlsgen2_account_name: None,
3288                adlsgen2_account_key: None,
3289                adlsgen2_endpoint: None,
3290                vended_credentials: None,
3291                catalog_security: None,
3292                gcp_auth_scopes: None,
3293                catalog_io_impl: None,
3294            },
3295            table: IcebergTableIdentifier {
3296                database_name: Some("demo_db".to_owned()),
3297                table_name: "demo_table".to_owned(),
3298            },
3299            r#type: "upsert".to_owned(),
3300            force_append_only: false,
3301            primary_key: Some(vec!["v1".to_owned()]),
3302            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3303            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3304                .into_iter()
3305                .map(|(k, v)| (k.to_owned(), v.to_owned()))
3306                .collect(),
3307            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3308            commit_checkpoint_size_threshold_mb: Some(
3309                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3310            ),
3311            create_table_if_not_exists: false,
3312            is_exactly_once: Some(true),
3313            commit_retry_num: 8,
3314            enable_compaction: true,
3315            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3316            enable_snapshot_expiration: true,
3317            write_mode: IcebergWriteMode::MergeOnRead,
3318            format_version: FormatVersion::V2,
3319            snapshot_expiration_max_age_millis: None,
3320            snapshot_expiration_retain_last: None,
3321            snapshot_expiration_clear_expired_files: true,
3322            snapshot_expiration_clear_expired_meta_data: true,
3323            max_snapshots_num_before_compaction: None,
3324            small_files_threshold_mb: None,
3325            delete_files_count_threshold: None,
3326            trigger_snapshot_count: None,
3327            target_file_size_mb: None,
3328            compaction_type: None,
3329            write_parquet_compression: None,
3330            write_parquet_max_row_group_rows: None,
3331        };
3332
3333        assert_eq!(iceberg_config, expected_iceberg_config);
3334
3335        assert_eq!(
3336            &iceberg_config.full_table_name().unwrap().to_string(),
3337            "demo_db.demo_table"
3338        );
3339    }
3340
3341    #[test]
3342    fn test_parse_commit_checkpoint_size_threshold() {
3343        let values: BTreeMap<String, String> = [
3344            ("connector", "iceberg"),
3345            ("type", "append-only"),
3346            ("force_append_only", "true"),
3347            ("catalog.name", "test-catalog"),
3348            ("catalog.type", "storage"),
3349            ("warehouse.path", "s3://my-bucket/warehouse"),
3350            ("database.name", "test_db"),
3351            ("table.name", "test_table"),
3352            ("commit_checkpoint_size_threshold_mb", "128"),
3353        ]
3354        .into_iter()
3355        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3356        .collect();
3357
3358        let config = IcebergConfig::from_btreemap(values).unwrap();
3359        assert_eq!(config.commit_checkpoint_size_threshold_mb, Some(128));
3360        assert_eq!(
3361            config.commit_checkpoint_size_threshold_bytes(),
3362            Some(128 * 1024 * 1024)
3363        );
3364    }
3365
3366    #[test]
3367    fn test_default_commit_checkpoint_size_threshold() {
3368        let values: BTreeMap<String, String> = [
3369            ("connector", "iceberg"),
3370            ("type", "append-only"),
3371            ("force_append_only", "true"),
3372            ("catalog.name", "test-catalog"),
3373            ("catalog.type", "storage"),
3374            ("warehouse.path", "s3://my-bucket/warehouse"),
3375            ("database.name", "test_db"),
3376            ("table.name", "test_table"),
3377        ]
3378        .into_iter()
3379        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3380        .collect();
3381
3382        let config = IcebergConfig::from_btreemap(values).unwrap();
3383        assert_eq!(
3384            config.commit_checkpoint_size_threshold_mb,
3385            Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
3386        );
3387        assert_eq!(
3388            config.commit_checkpoint_size_threshold_bytes(),
3389            Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB * 1024 * 1024)
3390        );
3391    }
3392
3393    #[test]
3394    fn test_reject_zero_commit_checkpoint_size_threshold() {
3395        let values: BTreeMap<String, String> = [
3396            ("connector", "iceberg"),
3397            ("type", "append-only"),
3398            ("force_append_only", "true"),
3399            ("catalog.name", "test-catalog"),
3400            ("catalog.type", "storage"),
3401            ("warehouse.path", "s3://my-bucket/warehouse"),
3402            ("database.name", "test_db"),
3403            ("table.name", "test_table"),
3404            ("commit_checkpoint_size_threshold_mb", "0"),
3405        ]
3406        .into_iter()
3407        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3408        .collect();
3409
3410        let err = IcebergConfig::from_btreemap(values).unwrap_err();
3411        assert!(
3412            err.to_string()
3413                .contains("`commit_checkpoint_size_threshold_mb` must be greater than 0")
3414        );
3415    }
3416
3417    async fn test_create_catalog(configs: BTreeMap<String, String>) {
3418        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3419
3420        let _table = iceberg_config.load_table().await.unwrap();
3421    }
3422
3423    #[tokio::test]
3424    #[ignore]
3425    async fn test_storage_catalog() {
3426        let values = [
3427            ("connector", "iceberg"),
3428            ("type", "append-only"),
3429            ("force_append_only", "true"),
3430            ("s3.endpoint", "http://127.0.0.1:9301"),
3431            ("s3.access.key", "hummockadmin"),
3432            ("s3.secret.key", "hummockadmin"),
3433            ("s3.region", "us-east-1"),
3434            ("s3.path.style.access", "true"),
3435            ("catalog.name", "demo"),
3436            ("catalog.type", "storage"),
3437            ("warehouse.path", "s3://icebergdata/demo"),
3438            ("database.name", "s1"),
3439            ("table.name", "t1"),
3440        ]
3441        .into_iter()
3442        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3443        .collect();
3444
3445        test_create_catalog(values).await;
3446    }
3447
3448    #[tokio::test]
3449    #[ignore]
3450    async fn test_rest_catalog() {
3451        let values = [
3452            ("connector", "iceberg"),
3453            ("type", "append-only"),
3454            ("force_append_only", "true"),
3455            ("s3.endpoint", "http://127.0.0.1:9301"),
3456            ("s3.access.key", "hummockadmin"),
3457            ("s3.secret.key", "hummockadmin"),
3458            ("s3.region", "us-east-1"),
3459            ("s3.path.style.access", "true"),
3460            ("catalog.name", "demo"),
3461            ("catalog.type", "rest"),
3462            ("catalog.uri", "http://192.168.167.4:8181"),
3463            ("warehouse.path", "s3://icebergdata/demo"),
3464            ("database.name", "s1"),
3465            ("table.name", "t1"),
3466        ]
3467        .into_iter()
3468        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3469        .collect();
3470
3471        test_create_catalog(values).await;
3472    }
3473
3474    #[tokio::test]
3475    #[ignore]
3476    async fn test_jdbc_catalog() {
3477        let values = [
3478            ("connector", "iceberg"),
3479            ("type", "append-only"),
3480            ("force_append_only", "true"),
3481            ("s3.endpoint", "http://127.0.0.1:9301"),
3482            ("s3.access.key", "hummockadmin"),
3483            ("s3.secret.key", "hummockadmin"),
3484            ("s3.region", "us-east-1"),
3485            ("s3.path.style.access", "true"),
3486            ("catalog.name", "demo"),
3487            ("catalog.type", "jdbc"),
3488            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3489            ("catalog.jdbc.user", "admin"),
3490            ("catalog.jdbc.password", "123456"),
3491            ("warehouse.path", "s3://icebergdata/demo"),
3492            ("database.name", "s1"),
3493            ("table.name", "t1"),
3494        ]
3495        .into_iter()
3496        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3497        .collect();
3498
3499        test_create_catalog(values).await;
3500    }
3501
3502    #[tokio::test]
3503    #[ignore]
3504    async fn test_hive_catalog() {
3505        let values = [
3506            ("connector", "iceberg"),
3507            ("type", "append-only"),
3508            ("force_append_only", "true"),
3509            ("s3.endpoint", "http://127.0.0.1:9301"),
3510            ("s3.access.key", "hummockadmin"),
3511            ("s3.secret.key", "hummockadmin"),
3512            ("s3.region", "us-east-1"),
3513            ("s3.path.style.access", "true"),
3514            ("catalog.name", "demo"),
3515            ("catalog.type", "hive"),
3516            ("catalog.uri", "thrift://localhost:9083"),
3517            ("warehouse.path", "s3://icebergdata/demo"),
3518            ("database.name", "s1"),
3519            ("table.name", "t1"),
3520        ]
3521        .into_iter()
3522        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3523        .collect();
3524
3525        test_create_catalog(values).await;
3526    }
3527
3528    /// Test parsing Google/BigLake authentication configuration.
3529    #[test]
3530    fn test_parse_google_auth_config() {
3531        let values: BTreeMap<String, String> = [
3532            ("connector", "iceberg"),
3533            ("type", "append-only"),
3534            ("force_append_only", "true"),
3535            ("catalog.name", "biglake-catalog"),
3536            ("catalog.type", "rest"),
3537            ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3538            ("warehouse.path", "bq://projects/my-gcp-project"),
3539            ("catalog.header", "x-goog-user-project=my-gcp-project"),
3540            ("catalog.security", "google"),
3541            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3542            ("database.name", "my_dataset"),
3543            ("table.name", "my_table"),
3544        ]
3545        .into_iter()
3546        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3547        .collect();
3548
3549        let config = IcebergConfig::from_btreemap(values).unwrap();
3550        assert_eq!(config.catalog_type(), "rest");
3551        assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3552        assert_eq!(
3553            config.common.gcp_auth_scopes.as_deref(),
3554            Some(
3555                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3556            )
3557        );
3558        assert_eq!(
3559            config.common.warehouse_path.as_deref(),
3560            Some("bq://projects/my-gcp-project")
3561        );
3562        assert_eq!(
3563            config.common.catalog_header.as_deref(),
3564            Some("x-goog-user-project=my-gcp-project")
3565        );
3566    }
3567
3568    /// Test parsing `oauth2` security configuration.
3569    #[test]
3570    fn test_parse_oauth2_security_config() {
3571        let values: BTreeMap<String, String> = [
3572            ("connector", "iceberg"),
3573            ("type", "append-only"),
3574            ("force_append_only", "true"),
3575            ("catalog.name", "oauth2-catalog"),
3576            ("catalog.type", "rest"),
3577            ("catalog.uri", "https://example.com/iceberg/rest"),
3578            ("warehouse.path", "s3://my-bucket/warehouse"),
3579            ("catalog.security", "oauth2"),
3580            ("catalog.credential", "client_id:client_secret"),
3581            ("catalog.token", "bearer-token"),
3582            (
3583                "catalog.oauth2_server_uri",
3584                "https://oauth.example.com/token",
3585            ),
3586            ("catalog.scope", "read write"),
3587            ("database.name", "test_db"),
3588            ("table.name", "test_table"),
3589        ]
3590        .into_iter()
3591        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3592        .collect();
3593
3594        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3595
3596        // Verify catalog type
3597        assert_eq!(iceberg_config.catalog_type(), "rest");
3598
3599        // Verify OAuth2-specific options
3600        assert_eq!(
3601            iceberg_config.common.catalog_security.as_deref(),
3602            Some("oauth2")
3603        );
3604        assert_eq!(
3605            iceberg_config.common.catalog_credential.as_deref(),
3606            Some("client_id:client_secret")
3607        );
3608        assert_eq!(
3609            iceberg_config.common.catalog_token.as_deref(),
3610            Some("bearer-token")
3611        );
3612        assert_eq!(
3613            iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3614            Some("https://oauth.example.com/token")
3615        );
3616        assert_eq!(
3617            iceberg_config.common.catalog_scope.as_deref(),
3618            Some("read write")
3619        );
3620    }
3621
3622    /// Test parsing invalid security configuration.
3623    #[test]
3624    fn test_parse_invalid_security_config() {
3625        let values: BTreeMap<String, String> = [
3626            ("connector", "iceberg"),
3627            ("type", "append-only"),
3628            ("force_append_only", "true"),
3629            ("catalog.name", "invalid-catalog"),
3630            ("catalog.type", "rest"),
3631            ("catalog.uri", "https://example.com/iceberg/rest"),
3632            ("warehouse.path", "s3://my-bucket/warehouse"),
3633            ("catalog.security", "invalid_security_type"),
3634            ("database.name", "test_db"),
3635            ("table.name", "test_table"),
3636        ]
3637        .into_iter()
3638        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3639        .collect();
3640
3641        // This should still parse successfully, but with a warning for unknown security type
3642        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3643
3644        // Verify that the invalid security type is still stored
3645        assert_eq!(
3646            iceberg_config.common.catalog_security.as_deref(),
3647            Some("invalid_security_type")
3648        );
3649
3650        // Verify catalog type
3651        assert_eq!(iceberg_config.catalog_type(), "rest");
3652    }
3653
3654    /// Test parsing custom `FileIO` implementation configuration.
3655    #[test]
3656    fn test_parse_custom_io_impl_config() {
3657        let values: BTreeMap<String, String> = [
3658            ("connector", "iceberg"),
3659            ("type", "append-only"),
3660            ("force_append_only", "true"),
3661            ("catalog.name", "gcs-catalog"),
3662            ("catalog.type", "rest"),
3663            ("catalog.uri", "https://example.com/iceberg/rest"),
3664            ("warehouse.path", "gs://my-bucket/warehouse"),
3665            ("catalog.security", "google"),
3666            ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3667            ("database.name", "test_db"),
3668            ("table.name", "test_table"),
3669        ]
3670        .into_iter()
3671        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3672        .collect();
3673
3674        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3675
3676        // Verify catalog type
3677        assert_eq!(iceberg_config.catalog_type(), "rest");
3678
3679        // Verify custom `FileIO` implementation
3680        assert_eq!(
3681            iceberg_config.common.catalog_io_impl.as_deref(),
3682            Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3683        );
3684
3685        // Verify Google security is set
3686        assert_eq!(
3687            iceberg_config.common.catalog_security.as_deref(),
3688            Some("google")
3689        );
3690    }
3691
3692    #[test]
3693    fn test_config_constants_consistency() {
3694        // This test ensures our constants match the expected configuration names
3695        // If you change a constant, this test will remind you to update both places
3696        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3697        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3698        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3699        assert_eq!(WRITE_MODE, "write_mode");
3700        assert_eq!(
3701            COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3702            "commit_checkpoint_size_threshold_mb"
3703        );
3704        assert_eq!(
3705            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3706            "snapshot_expiration_retain_last"
3707        );
3708        assert_eq!(
3709            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3710            "snapshot_expiration_max_age_millis"
3711        );
3712        assert_eq!(
3713            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3714            "snapshot_expiration_clear_expired_files"
3715        );
3716        assert_eq!(
3717            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3718            "snapshot_expiration_clear_expired_meta_data"
3719        );
3720        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3721    }
3722
3723    /// Test parsing all compaction.* prefix configs and their default values.
3724    #[test]
3725    fn test_parse_compaction_config() {
3726        // Test with all compaction configs specified
3727        let values: BTreeMap<String, String> = [
3728            ("connector", "iceberg"),
3729            ("type", "upsert"),
3730            ("primary_key", "id"),
3731            ("warehouse.path", "s3://iceberg"),
3732            ("s3.endpoint", "http://127.0.0.1:9301"),
3733            ("s3.access.key", "test"),
3734            ("s3.secret.key", "test"),
3735            ("s3.region", "us-east-1"),
3736            ("catalog.type", "storage"),
3737            ("catalog.name", "demo"),
3738            ("database.name", "test_db"),
3739            ("table.name", "test_table"),
3740            ("enable_compaction", "true"),
3741            ("compaction.max_snapshots_num", "100"),
3742            ("compaction.small_files_threshold_mb", "512"),
3743            ("compaction.delete_files_count_threshold", "50"),
3744            ("compaction.trigger_snapshot_count", "10"),
3745            ("compaction.target_file_size_mb", "256"),
3746            ("compaction.type", "full"),
3747            ("compaction.write_parquet_compression", "zstd"),
3748            ("compaction.write_parquet_max_row_group_rows", "50000"),
3749        ]
3750        .into_iter()
3751        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3752        .collect();
3753
3754        let config = IcebergConfig::from_btreemap(values).unwrap();
3755        assert!(config.enable_compaction);
3756        assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3757        assert_eq!(config.small_files_threshold_mb, Some(512));
3758        assert_eq!(config.delete_files_count_threshold, Some(50));
3759        assert_eq!(config.trigger_snapshot_count, Some(10));
3760        assert_eq!(config.target_file_size_mb, Some(256));
3761        assert_eq!(config.compaction_type, Some(CompactionType::Full));
3762        assert_eq!(config.target_file_size_mb(), 256);
3763        assert_eq!(config.write_parquet_compression(), "zstd");
3764        assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3765
3766        // Test default values (no compaction configs specified)
3767        let values: BTreeMap<String, String> = [
3768            ("connector", "iceberg"),
3769            ("type", "append-only"),
3770            ("force_append_only", "true"),
3771            ("catalog.name", "test-catalog"),
3772            ("catalog.type", "storage"),
3773            ("warehouse.path", "s3://my-bucket/warehouse"),
3774            ("database.name", "test_db"),
3775            ("table.name", "test_table"),
3776        ]
3777        .into_iter()
3778        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3779        .collect();
3780
3781        let config = IcebergConfig::from_btreemap(values).unwrap();
3782        assert_eq!(config.target_file_size_mb(), 1024); // Default
3783        assert_eq!(config.write_parquet_compression(), "zstd"); // Default
3784        assert_eq!(config.write_parquet_max_row_group_rows(), 122880); // Default
3785    }
3786
3787    /// Test parquet compression parsing.
3788    #[test]
3789    fn test_parse_parquet_compression() {
3790        use parquet::basic::Compression;
3791
3792        use super::parse_parquet_compression;
3793
3794        // Test valid compression types
3795        assert!(matches!(
3796            parse_parquet_compression("snappy"),
3797            Compression::SNAPPY
3798        ));
3799        assert!(matches!(
3800            parse_parquet_compression("gzip"),
3801            Compression::GZIP(_)
3802        ));
3803        assert!(matches!(
3804            parse_parquet_compression("zstd"),
3805            Compression::ZSTD(_)
3806        ));
3807        assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3808        assert!(matches!(
3809            parse_parquet_compression("brotli"),
3810            Compression::BROTLI(_)
3811        ));
3812        assert!(matches!(
3813            parse_parquet_compression("uncompressed"),
3814            Compression::UNCOMPRESSED
3815        ));
3816
3817        // Test case insensitivity
3818        assert!(matches!(
3819            parse_parquet_compression("SNAPPY"),
3820            Compression::SNAPPY
3821        ));
3822        assert!(matches!(
3823            parse_parquet_compression("Zstd"),
3824            Compression::ZSTD(_)
3825        ));
3826
3827        // Test invalid compression (should fall back to SNAPPY)
3828        assert!(matches!(
3829            parse_parquet_compression("invalid"),
3830            Compression::SNAPPY
3831        ));
3832    }
3833
3834    #[test]
3835    fn test_append_only_rejects_copy_on_write() {
3836        // Test that append-only sinks reject copy-on-write mode
3837        let values = [
3838            ("connector", "iceberg"),
3839            ("type", "append-only"),
3840            ("warehouse.path", "s3://iceberg"),
3841            ("s3.endpoint", "http://127.0.0.1:9301"),
3842            ("s3.access.key", "test"),
3843            ("s3.secret.key", "test"),
3844            ("s3.region", "us-east-1"),
3845            ("catalog.type", "storage"),
3846            ("catalog.name", "demo"),
3847            ("database.name", "test_db"),
3848            ("table.name", "test_table"),
3849            ("write_mode", "copy-on-write"),
3850        ]
3851        .into_iter()
3852        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3853        .collect();
3854
3855        let result = IcebergConfig::from_btreemap(values);
3856        assert!(result.is_err());
3857        assert!(
3858            result
3859                .unwrap_err()
3860                .to_string()
3861                .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3862        );
3863    }
3864
3865    #[test]
3866    fn test_append_only_accepts_merge_on_read() {
3867        // Test that append-only sinks accept merge-on-read mode (explicit)
3868        let values = [
3869            ("connector", "iceberg"),
3870            ("type", "append-only"),
3871            ("warehouse.path", "s3://iceberg"),
3872            ("s3.endpoint", "http://127.0.0.1:9301"),
3873            ("s3.access.key", "test"),
3874            ("s3.secret.key", "test"),
3875            ("s3.region", "us-east-1"),
3876            ("catalog.type", "storage"),
3877            ("catalog.name", "demo"),
3878            ("database.name", "test_db"),
3879            ("table.name", "test_table"),
3880            ("write_mode", "merge-on-read"),
3881        ]
3882        .into_iter()
3883        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3884        .collect();
3885
3886        let result = IcebergConfig::from_btreemap(values);
3887        assert!(result.is_ok());
3888        let config = result.unwrap();
3889        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3890    }
3891
3892    #[test]
3893    fn test_append_only_defaults_to_merge_on_read() {
3894        // Test that append-only sinks default to merge-on-read when write_mode is not specified
3895        let values = [
3896            ("connector", "iceberg"),
3897            ("type", "append-only"),
3898            ("warehouse.path", "s3://iceberg"),
3899            ("s3.endpoint", "http://127.0.0.1:9301"),
3900            ("s3.access.key", "test"),
3901            ("s3.secret.key", "test"),
3902            ("s3.region", "us-east-1"),
3903            ("catalog.type", "storage"),
3904            ("catalog.name", "demo"),
3905            ("database.name", "test_db"),
3906            ("table.name", "test_table"),
3907        ]
3908        .into_iter()
3909        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3910        .collect();
3911
3912        let result = IcebergConfig::from_btreemap(values);
3913        assert!(result.is_ok());
3914        let config = result.unwrap();
3915        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3916    }
3917
3918    #[test]
3919    fn test_upsert_accepts_copy_on_write() {
3920        // Test that upsert sinks accept copy-on-write mode
3921        let values = [
3922            ("connector", "iceberg"),
3923            ("type", "upsert"),
3924            ("primary_key", "id"),
3925            ("warehouse.path", "s3://iceberg"),
3926            ("s3.endpoint", "http://127.0.0.1:9301"),
3927            ("s3.access.key", "test"),
3928            ("s3.secret.key", "test"),
3929            ("s3.region", "us-east-1"),
3930            ("catalog.type", "storage"),
3931            ("catalog.name", "demo"),
3932            ("database.name", "test_db"),
3933            ("table.name", "test_table"),
3934            ("write_mode", "copy-on-write"),
3935        ]
3936        .into_iter()
3937        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3938        .collect();
3939
3940        let result = IcebergConfig::from_btreemap(values);
3941        assert!(result.is_ok());
3942        let config = result.unwrap();
3943        assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3944    }
3945}