risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31    SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32    UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38    DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45    PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50    DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64    Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94    SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103    TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121    #[default]
122    MergeOnRead,
123    CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127    pub fn as_str(self) -> &'static str {
128        match self {
129            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131        }
132    }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136    type Err = SinkError;
137
138    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139        match s {
140            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142            _ => Err(SinkError::Config(anyhow!(format!(
143                "invalid write_mode: {}, must be one of: {}, {}",
144                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145            )))),
146        }
147    }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151    type Error = <Self as std::str::FromStr>::Err;
152
153    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154        value.parse()
155    }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159    type Error = <Self as std::str::FromStr>::Err;
160
161    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162        value.as_str().parse()
163    }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.write_str(self.as_str())
169    }
170}
171
172// Configuration constants
173pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182    "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197    "compaction.write_parquet_max_row_group_rows";
198pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
199pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
200
201const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
202
203fn default_commit_retry_num() -> u32 {
204    8
205}
206
207fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
208    Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
209}
210
211fn default_iceberg_write_mode() -> IcebergWriteMode {
212    IcebergWriteMode::MergeOnRead
213}
214
215fn default_iceberg_format_version() -> FormatVersion {
216    FormatVersion::V2
217}
218
219fn default_true() -> bool {
220    true
221}
222
223fn default_some_true() -> Option<bool> {
224    Some(true)
225}
226
227fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
228    let parsed = value
229        .trim()
230        .parse::<u8>()
231        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
232    match parsed {
233        1 => Ok(FormatVersion::V1),
234        2 => Ok(FormatVersion::V2),
235        3 => Ok(FormatVersion::V3),
236        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
237    }
238}
239
240fn deserialize_format_version<'de, D>(
241    deserializer: D,
242) -> std::result::Result<FormatVersion, D::Error>
243where
244    D: Deserializer<'de>,
245{
246    struct FormatVersionVisitor;
247
248    impl<'de> Visitor<'de> for FormatVersionVisitor {
249        type Value = FormatVersion;
250
251        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252            formatter.write_str("format-version as 1, 2, or 3")
253        }
254
255        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
256        where
257            E: de::Error,
258        {
259            let value = u8::try_from(value)
260                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
261            parse_format_version_str(&value.to_string()).map_err(E::custom)
262        }
263
264        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
265        where
266            E: de::Error,
267        {
268            let value = u8::try_from(value)
269                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
270            parse_format_version_str(&value.to_string()).map_err(E::custom)
271        }
272
273        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
274        where
275            E: de::Error,
276        {
277            parse_format_version_str(value).map_err(E::custom)
278        }
279
280        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
281        where
282            E: de::Error,
283        {
284            self.visit_str(&value)
285        }
286    }
287
288    deserializer.deserialize_any(FormatVersionVisitor)
289}
290
291/// Compaction type for Iceberg sink
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
293#[serde(rename_all = "kebab-case")]
294pub enum CompactionType {
295    /// Full compaction - rewrites all data files
296    #[default]
297    Full,
298    /// Small files compaction - only compact small files
299    SmallFiles,
300    /// Files with delete compaction - only compact files that have associated delete files
301    FilesWithDelete,
302}
303
304impl CompactionType {
305    pub fn as_str(&self) -> &'static str {
306        match self {
307            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
308            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
309            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
310        }
311    }
312}
313
314impl std::str::FromStr for CompactionType {
315    type Err = SinkError;
316
317    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
318        match s {
319            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
320            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
321            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
322            _ => Err(SinkError::Config(anyhow!(format!(
323                "invalid compaction_type: {}, must be one of: {}, {}, {}",
324                s,
325                ICEBERG_COMPACTION_TYPE_FULL,
326                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
327                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
328            )))),
329        }
330    }
331}
332
333impl TryFrom<&str> for CompactionType {
334    type Error = <Self as std::str::FromStr>::Err;
335
336    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
337        value.parse()
338    }
339}
340
341impl TryFrom<String> for CompactionType {
342    type Error = <Self as std::str::FromStr>::Err;
343
344    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
345        value.as_str().parse()
346    }
347}
348
349impl std::fmt::Display for CompactionType {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        write!(f, "{}", self.as_str())
352    }
353}
354
355#[serde_as]
356#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
357pub struct IcebergConfig {
358    pub r#type: String, // accept "append-only" or "upsert"
359
360    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
361    pub force_append_only: bool,
362
363    #[serde(flatten)]
364    common: IcebergCommon,
365
366    #[serde(flatten)]
367    table: IcebergTableIdentifier,
368
369    #[serde(
370        rename = "primary_key",
371        default,
372        deserialize_with = "deserialize_optional_string_seq_from_string"
373    )]
374    pub primary_key: Option<Vec<String>>,
375
376    // Props for java catalog props.
377    #[serde(skip)]
378    pub java_catalog_props: HashMap<String, String>,
379
380    #[serde(default)]
381    pub partition_by: Option<String>,
382
383    /// Commit every n(>0) checkpoints, default is 60.
384    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
385    #[serde_as(as = "DisplayFromStr")]
386    #[with_option(allow_alter_on_fly)]
387    pub commit_checkpoint_interval: u64,
388
389    /// Commit on the next checkpoint barrier after buffered write size exceeds this threshold.
390    /// Default is 128 MB.
391    #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
392    #[serde_as(as = "Option<DisplayFromStr>")]
393    #[with_option(allow_alter_on_fly)]
394    pub commit_checkpoint_size_threshold_mb: Option<u64>,
395
396    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
397    pub create_table_if_not_exists: bool,
398
399    /// Whether it is `exactly_once`, the default is true.
400    #[serde(default = "default_some_true")]
401    #[serde_as(as = "Option<DisplayFromStr>")]
402    pub is_exactly_once: Option<bool>,
403    // Retry commit num when iceberg commit fail. default is 8.
404    // # TODO
405    // Iceberg table may store the retry commit num in table meta.
406    // We should try to find and use that as default commit retry num first.
407    #[serde(default = "default_commit_retry_num")]
408    pub commit_retry_num: u32,
409
410    /// Whether to enable iceberg compaction.
411    #[serde(
412        rename = "enable_compaction",
413        default,
414        deserialize_with = "deserialize_bool_from_string"
415    )]
416    #[with_option(allow_alter_on_fly)]
417    pub enable_compaction: bool,
418
419    /// The interval of iceberg compaction
420    #[serde(rename = "compaction_interval_sec", default)]
421    #[serde_as(as = "Option<DisplayFromStr>")]
422    #[with_option(allow_alter_on_fly)]
423    pub compaction_interval_sec: Option<u64>,
424
425    /// Whether to enable iceberg expired snapshots.
426    #[serde(
427        rename = "enable_snapshot_expiration",
428        default,
429        deserialize_with = "deserialize_bool_from_string"
430    )]
431    #[with_option(allow_alter_on_fly)]
432    pub enable_snapshot_expiration: bool,
433
434    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
435    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
436    pub write_mode: IcebergWriteMode,
437
438    /// Iceberg format version for table creation.
439    #[serde(
440        rename = "format_version",
441        default = "default_iceberg_format_version",
442        deserialize_with = "deserialize_format_version"
443    )]
444    pub format_version: FormatVersion,
445
446    /// The maximum age (in milliseconds) for snapshots before they expire
447    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
448    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
449    #[serde_as(as = "Option<DisplayFromStr>")]
450    #[with_option(allow_alter_on_fly)]
451    pub snapshot_expiration_max_age_millis: Option<i64>,
452
453    /// The number of snapshots to retain
454    #[serde(rename = "snapshot_expiration_retain_last", default)]
455    #[serde_as(as = "Option<DisplayFromStr>")]
456    #[with_option(allow_alter_on_fly)]
457    pub snapshot_expiration_retain_last: Option<i32>,
458
459    #[serde(
460        rename = "snapshot_expiration_clear_expired_files",
461        default = "default_true",
462        deserialize_with = "deserialize_bool_from_string"
463    )]
464    #[with_option(allow_alter_on_fly)]
465    pub snapshot_expiration_clear_expired_files: bool,
466
467    #[serde(
468        rename = "snapshot_expiration_clear_expired_meta_data",
469        default = "default_true",
470        deserialize_with = "deserialize_bool_from_string"
471    )]
472    #[with_option(allow_alter_on_fly)]
473    pub snapshot_expiration_clear_expired_meta_data: bool,
474
475    /// The maximum number of snapshots allowed since the last rewrite operation
476    /// If set, sink will check snapshot count and wait if exceeded
477    #[serde(rename = "compaction.max_snapshots_num", default)]
478    #[serde_as(as = "Option<DisplayFromStr>")]
479    #[with_option(allow_alter_on_fly)]
480    pub max_snapshots_num_before_compaction: Option<usize>,
481
482    #[serde(rename = "compaction.small_files_threshold_mb", default)]
483    #[serde_as(as = "Option<DisplayFromStr>")]
484    #[with_option(allow_alter_on_fly)]
485    pub small_files_threshold_mb: Option<u64>,
486
487    #[serde(rename = "compaction.delete_files_count_threshold", default)]
488    #[serde_as(as = "Option<DisplayFromStr>")]
489    #[with_option(allow_alter_on_fly)]
490    pub delete_files_count_threshold: Option<usize>,
491
492    #[serde(rename = "compaction.trigger_snapshot_count", default)]
493    #[serde_as(as = "Option<DisplayFromStr>")]
494    #[with_option(allow_alter_on_fly)]
495    pub trigger_snapshot_count: Option<usize>,
496
497    #[serde(rename = "compaction.target_file_size_mb", default)]
498    #[serde_as(as = "Option<DisplayFromStr>")]
499    #[with_option(allow_alter_on_fly)]
500    pub target_file_size_mb: Option<u64>,
501
502    /// Compaction type: `full`, `small-files`, or `files-with-delete`
503    /// If not set, will default to `full`
504    #[serde(rename = "compaction.type", default)]
505    #[with_option(allow_alter_on_fly)]
506    pub compaction_type: Option<CompactionType>,
507
508    /// Parquet compression codec
509    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
510    /// Default is snappy
511    #[serde(rename = "compaction.write_parquet_compression", default)]
512    #[with_option(allow_alter_on_fly)]
513    pub write_parquet_compression: Option<String>,
514
515    /// Maximum number of rows in a Parquet row group
516    /// Default is 122880 (from developer config)
517    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
518    #[serde_as(as = "Option<DisplayFromStr>")]
519    #[with_option(allow_alter_on_fly)]
520    pub write_parquet_max_row_group_rows: Option<usize>,
521}
522
523impl EnforceSecret for IcebergConfig {
524    fn enforce_secret<'a>(
525        prop_iter: impl Iterator<Item = &'a str>,
526    ) -> crate::error::ConnectorResult<()> {
527        for prop in prop_iter {
528            IcebergCommon::enforce_one(prop)?;
529        }
530        Ok(())
531    }
532
533    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
534        IcebergCommon::enforce_one(prop)
535    }
536}
537
538impl IcebergConfig {
539    /// Validate that append-only sinks use merge-on-read mode
540    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
541    fn validate_append_only_write_mode(
542        sink_type: &str,
543        write_mode: IcebergWriteMode,
544    ) -> Result<()> {
545        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
546            return Err(SinkError::Config(anyhow!(
547                "'copy-on-write' mode is not supported for append-only iceberg sink. \
548                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
549            )));
550        }
551        Ok(())
552    }
553
554    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
555        let mut config =
556            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
557                .map_err(|e| SinkError::Config(anyhow!(e)))?;
558
559        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
560            return Err(SinkError::Config(anyhow!(
561                "`{}` must be {}, or {}",
562                SINK_TYPE_OPTION,
563                SINK_TYPE_APPEND_ONLY,
564                SINK_TYPE_UPSERT
565            )));
566        }
567
568        if config.r#type == SINK_TYPE_UPSERT {
569            if let Some(primary_key) = &config.primary_key {
570                if primary_key.is_empty() {
571                    return Err(SinkError::Config(anyhow!(
572                        "`primary-key` must not be empty in {}",
573                        SINK_TYPE_UPSERT
574                    )));
575                }
576            } else {
577                return Err(SinkError::Config(anyhow!(
578                    "Must set `primary-key` in {}",
579                    SINK_TYPE_UPSERT
580                )));
581            }
582        }
583
584        // Enforce merge-on-read for append-only sinks
585        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
586
587        // All configs start with "catalog." will be treated as java configs.
588        config.java_catalog_props = values
589            .iter()
590            .filter(|(k, _v)| {
591                k.starts_with("catalog.")
592                    && k != &"catalog.uri"
593                    && k != &"catalog.type"
594                    && k != &"catalog.name"
595                    && k != &"catalog.header"
596            })
597            .map(|(k, v)| (k[8..].to_string(), v.clone()))
598            .collect();
599
600        if config.commit_checkpoint_interval == 0 {
601            return Err(SinkError::Config(anyhow!(
602                "`commit-checkpoint-interval` must be greater than 0"
603            )));
604        }
605
606        if config.commit_checkpoint_size_threshold_mb == Some(0) {
607            return Err(SinkError::Config(anyhow!(
608                "`commit_checkpoint_size_threshold_mb` must be greater than 0"
609            )));
610        }
611
612        Ok(config)
613    }
614
615    pub fn catalog_type(&self) -> &str {
616        self.common.catalog_type()
617    }
618
619    pub async fn load_table(&self) -> Result<Table> {
620        self.common
621            .load_table(&self.table, &self.java_catalog_props)
622            .await
623            .map_err(Into::into)
624    }
625
626    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
627        self.common
628            .create_catalog(&self.java_catalog_props)
629            .await
630            .map_err(Into::into)
631    }
632
633    pub fn full_table_name(&self) -> Result<TableIdent> {
634        self.table.to_table_ident().map_err(Into::into)
635    }
636
637    pub fn catalog_name(&self) -> String {
638        self.common.catalog_name()
639    }
640
641    pub fn table_format_version(&self) -> FormatVersion {
642        self.format_version
643    }
644
645    pub fn compaction_interval_sec(&self) -> u64 {
646        // default to 1 hour
647        self.compaction_interval_sec.unwrap_or(3600)
648    }
649
650    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
651    /// Returns `current_time_ms` - `max_age_millis`
652    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
653        self.snapshot_expiration_max_age_millis
654            .map(|max_age_millis| current_time_ms - max_age_millis)
655    }
656
657    pub fn trigger_snapshot_count(&self) -> usize {
658        self.trigger_snapshot_count.unwrap_or(usize::MAX)
659    }
660
661    pub fn small_files_threshold_mb(&self) -> u64 {
662        self.small_files_threshold_mb.unwrap_or(64)
663    }
664
665    pub fn delete_files_count_threshold(&self) -> usize {
666        self.delete_files_count_threshold.unwrap_or(256)
667    }
668
669    pub fn target_file_size_mb(&self) -> u64 {
670        self.target_file_size_mb.unwrap_or(1024)
671    }
672
673    pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
674        self.commit_checkpoint_size_threshold_mb
675            .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
676    }
677
678    /// Get the compaction type as an enum
679    /// This method parses the string and returns the enum value
680    pub fn compaction_type(&self) -> CompactionType {
681        self.compaction_type.unwrap_or_default()
682    }
683
684    /// Get the parquet compression codec
685    /// Default is "zstd"
686    pub fn write_parquet_compression(&self) -> &str {
687        self.write_parquet_compression.as_deref().unwrap_or("zstd")
688    }
689
690    /// Get the maximum number of rows in a Parquet row group
691    /// Default is 122880 (from developer config default)
692    pub fn write_parquet_max_row_group_rows(&self) -> usize {
693        self.write_parquet_max_row_group_rows.unwrap_or(122880)
694    }
695
696    /// Parse the compression codec string into Parquet Compression enum
697    /// Returns SNAPPY as default if parsing fails or not specified
698    pub fn get_parquet_compression(&self) -> Compression {
699        parse_parquet_compression(self.write_parquet_compression())
700    }
701}
702
703/// Parse compression codec string to Parquet Compression enum
704fn parse_parquet_compression(codec: &str) -> Compression {
705    match codec.to_lowercase().as_str() {
706        "uncompressed" => Compression::UNCOMPRESSED,
707        "snappy" => Compression::SNAPPY,
708        "gzip" => Compression::GZIP(Default::default()),
709        "lzo" => Compression::LZO,
710        "brotli" => Compression::BROTLI(Default::default()),
711        "lz4" => Compression::LZ4,
712        "zstd" => Compression::ZSTD(Default::default()),
713        _ => {
714            tracing::warn!(
715                "Unknown compression codec '{}', falling back to SNAPPY",
716                codec
717            );
718            Compression::SNAPPY
719        }
720    }
721}
722
723pub struct IcebergSink {
724    pub config: IcebergConfig,
725    param: SinkParam,
726    // In upsert mode, it never be None and empty.
727    unique_column_ids: Option<Vec<usize>>,
728}
729
730impl EnforceSecret for IcebergSink {
731    fn enforce_secret<'a>(
732        prop_iter: impl Iterator<Item = &'a str>,
733    ) -> crate::error::ConnectorResult<()> {
734        for prop in prop_iter {
735            IcebergConfig::enforce_one(prop)?;
736        }
737        Ok(())
738    }
739}
740
741impl TryFrom<SinkParam> for IcebergSink {
742    type Error = SinkError;
743
744    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
745        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
746        IcebergSink::new(config, param)
747    }
748}
749
750impl Debug for IcebergSink {
751    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
752        f.debug_struct("IcebergSink")
753            .field("config", &self.config)
754            .finish()
755    }
756}
757
758async fn create_and_validate_table_impl(
759    config: &IcebergConfig,
760    param: &SinkParam,
761) -> Result<Table> {
762    if config.create_table_if_not_exists {
763        create_table_if_not_exists_impl(config, param).await?;
764    }
765
766    let table = config
767        .load_table()
768        .await
769        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
770
771    let sink_schema = param.schema();
772    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
773        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
774
775    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
776        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
777
778    Ok(table)
779}
780
781async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
782    let catalog = config.create_catalog().await?;
783    let namespace = if let Some(database_name) = config.table.database_name() {
784        let namespace = NamespaceIdent::new(database_name.to_owned());
785        if !catalog
786            .namespace_exists(&namespace)
787            .await
788            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
789        {
790            catalog
791                .create_namespace(&namespace, HashMap::default())
792                .await
793                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794                .context("failed to create iceberg namespace")?;
795        }
796        namespace
797    } else {
798        bail!("database name must be set if you want to create table")
799    };
800
801    let table_id = config
802        .full_table_name()
803        .context("Unable to parse table name")?;
804    if !catalog
805        .table_exists(&table_id)
806        .await
807        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
808    {
809        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
810        // convert risingwave schema -> arrow schema -> iceberg schema
811        let arrow_fields = param
812            .columns
813            .iter()
814            .map(|column| {
815                Ok(iceberg_create_table_arrow_convert
816                    .to_arrow_field(&column.name, &column.data_type)
817                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
818                    .context(format!(
819                        "failed to convert {}: {} to arrow type",
820                        &column.name, &column.data_type
821                    ))?)
822            })
823            .collect::<Result<Vec<ArrowField>>>()?;
824        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
825        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
826            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
827            .context("failed to convert arrow schema to iceberg schema")?;
828
829        let location = {
830            let mut names = namespace.clone().inner();
831            names.push(config.table.table_name().to_owned());
832            match &config.common.warehouse_path {
833                Some(warehouse_path) => {
834                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
835                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
836                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
837                    let url = Url::parse(warehouse_path);
838                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
839                        // For rest catalog, the warehouse_path could be a warehouse name.
840                        // In this case, we should specify the location when creating a table.
841                        if config.common.catalog_type() == "rest"
842                            || config.common.catalog_type() == "rest_rust"
843                        {
844                            None
845                        } else {
846                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
847                        }
848                    } else if warehouse_path.ends_with('/') {
849                        Some(format!("{}{}", warehouse_path, names.join("/")))
850                    } else {
851                        Some(format!("{}/{}", warehouse_path, names.join("/")))
852                    }
853                }
854                None => None,
855            }
856        };
857
858        let partition_spec = match &config.partition_by {
859            Some(partition_by) => {
860                let mut partition_fields = Vec::<UnboundPartitionField>::new();
861                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
862                    .into_iter()
863                    .enumerate()
864                {
865                    match iceberg_schema.field_id_by_name(&column) {
866                        Some(id) => partition_fields.push(
867                            UnboundPartitionField::builder()
868                                .source_id(id)
869                                .transform(transform)
870                                .name(format!("_p_{}", column))
871                                .field_id(PARTITION_DATA_ID_START + i as i32)
872                                .build(),
873                        ),
874                        None => bail!(format!(
875                            "Partition source column does not exist in schema: {}",
876                            column
877                        )),
878                    };
879                }
880                Some(
881                    UnboundPartitionSpec::builder()
882                        .with_spec_id(0)
883                        .add_partition_fields(partition_fields)
884                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
885                        .context("failed to add partition columns")?
886                        .build(),
887                )
888            }
889            None => None,
890        };
891
892        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
893        let properties = HashMap::from([(
894            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
895            (config.format_version as u8).to_string(),
896        )]);
897
898        let table_creation_builder = TableCreation::builder()
899            .name(config.table.table_name().to_owned())
900            .schema(iceberg_schema)
901            .format_version(config.table_format_version())
902            .properties(properties);
903
904        let table_creation = match (location, partition_spec) {
905            (Some(location), Some(partition_spec)) => table_creation_builder
906                .location(location)
907                .partition_spec(partition_spec)
908                .build(),
909            (Some(location), None) => table_creation_builder.location(location).build(),
910            (None, Some(partition_spec)) => table_creation_builder
911                .partition_spec(partition_spec)
912                .build(),
913            (None, None) => table_creation_builder.build(),
914        };
915
916        catalog
917            .create_table(&namespace, table_creation)
918            .await
919            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
920            .context("failed to create iceberg table")?;
921    }
922    Ok(())
923}
924
925impl IcebergSink {
926    pub async fn create_and_validate_table(&self) -> Result<Table> {
927        create_and_validate_table_impl(&self.config, &self.param).await
928    }
929
930    pub async fn create_table_if_not_exists(&self) -> Result<()> {
931        create_table_if_not_exists_impl(&self.config, &self.param).await
932    }
933
934    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
935        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
936            if let Some(pk) = &config.primary_key {
937                let mut unique_column_ids = Vec::with_capacity(pk.len());
938                for col_name in pk {
939                    let id = param
940                        .columns
941                        .iter()
942                        .find(|col| col.name.as_str() == col_name)
943                        .ok_or_else(|| {
944                            SinkError::Config(anyhow!(
945                                "Primary key column {} not found in sink schema",
946                                col_name
947                            ))
948                        })?
949                        .column_id
950                        .get_id() as usize;
951                    unique_column_ids.push(id);
952                }
953                Some(unique_column_ids)
954            } else {
955                unreachable!()
956            }
957        } else {
958            None
959        };
960        Ok(Self {
961            config,
962            param,
963            unique_column_ids,
964        })
965    }
966}
967
968impl Sink for IcebergSink {
969    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
970
971    const SINK_NAME: &'static str = ICEBERG_SINK;
972
973    async fn validate(&self) -> Result<()> {
974        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
975            bail!("Snowflake catalog only supports iceberg sources");
976        }
977
978        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
979            risingwave_common::license::Feature::IcebergSinkWithGlue
980                .check_available()
981                .map_err(|e| anyhow::anyhow!(e))?;
982        }
983
984        // Enforce merge-on-read for append-only tables
985        IcebergConfig::validate_append_only_write_mode(
986            &self.config.r#type,
987            self.config.write_mode,
988        )?;
989
990        // Validate compaction type configuration
991        let compaction_type = self.config.compaction_type();
992
993        // Check COW mode constraints
994        // COW mode only supports 'full' compaction type
995        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
996            && compaction_type != CompactionType::Full
997        {
998            bail!(
999                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
1000                compaction_type
1001            );
1002        }
1003
1004        match compaction_type {
1005            CompactionType::SmallFiles => {
1006                // 1. check license
1007                risingwave_common::license::Feature::IcebergCompaction
1008                    .check_available()
1009                    .map_err(|e| anyhow::anyhow!(e))?;
1010
1011                // 2. check write mode
1012                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1013                    bail!(
1014                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1015                        self.config.write_mode
1016                    );
1017                }
1018
1019                // 3. check conflicting parameters
1020                if self.config.delete_files_count_threshold.is_some() {
1021                    bail!(
1022                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
1023                    );
1024                }
1025            }
1026            CompactionType::FilesWithDelete => {
1027                // 1. check license
1028                risingwave_common::license::Feature::IcebergCompaction
1029                    .check_available()
1030                    .map_err(|e| anyhow::anyhow!(e))?;
1031
1032                // 2. check write mode
1033                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1034                    bail!(
1035                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1036                        self.config.write_mode
1037                    );
1038                }
1039
1040                // 3. check conflicting parameters
1041                if self.config.small_files_threshold_mb.is_some() {
1042                    bail!(
1043                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1044                    );
1045                }
1046            }
1047            CompactionType::Full => {
1048                // Full compaction has no special requirements
1049            }
1050        }
1051
1052        let _ = self.create_and_validate_table().await?;
1053        Ok(())
1054    }
1055
1056    fn support_schema_change() -> bool {
1057        true
1058    }
1059
1060    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1061        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1062
1063        // Validate compaction interval
1064        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1065            if iceberg_config.enable_compaction && compaction_interval == 0 {
1066                bail!(
1067                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1068                );
1069            }
1070
1071            // log compaction_interval
1072            tracing::info!(
1073                "Alter config compaction_interval set to {} seconds",
1074                compaction_interval
1075            );
1076        }
1077
1078        // Validate max snapshots
1079        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1080            && max_snapshots < 1
1081        {
1082            bail!(
1083                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1084                max_snapshots
1085            );
1086        }
1087
1088        // Validate target file size
1089        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1090            && target_file_size_mb == 0
1091        {
1092            bail!("`compaction.target_file_size_mb` must be greater than 0");
1093        }
1094
1095        // Validate parquet max row group rows
1096        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1097            && max_row_group_rows == 0
1098        {
1099            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1100        }
1101
1102        // Validate parquet compression codec
1103        if let Some(ref compression) = iceberg_config.write_parquet_compression {
1104            let valid_codecs = [
1105                "uncompressed",
1106                "snappy",
1107                "gzip",
1108                "lzo",
1109                "brotli",
1110                "lz4",
1111                "zstd",
1112            ];
1113            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1114                bail!(
1115                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1116                    valid_codecs,
1117                    compression
1118                );
1119            }
1120        }
1121
1122        Ok(())
1123    }
1124
1125    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1126        let writer = IcebergSinkWriter::new(
1127            self.config.clone(),
1128            self.param.clone(),
1129            writer_param.clone(),
1130            self.unique_column_ids.clone(),
1131        );
1132
1133        let commit_checkpoint_interval =
1134            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1135                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1136            );
1137        let log_sinker = CoordinatedLogSinker::new(
1138            &writer_param,
1139            self.param.clone(),
1140            writer,
1141            commit_checkpoint_interval,
1142        )
1143        .await?;
1144
1145        Ok(log_sinker)
1146    }
1147
1148    fn is_coordinated_sink(&self) -> bool {
1149        true
1150    }
1151
1152    async fn new_coordinator(
1153        &self,
1154        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1155    ) -> Result<SinkCommitCoordinator> {
1156        let catalog = self.config.create_catalog().await?;
1157        let table = self.create_and_validate_table().await?;
1158        let coordinator = IcebergSinkCommitter {
1159            catalog,
1160            table,
1161            last_commit_epoch: 0,
1162            sink_id: self.param.sink_id,
1163            config: self.config.clone(),
1164            param: self.param.clone(),
1165            commit_retry_num: self.config.commit_retry_num,
1166            iceberg_compact_stat_sender,
1167        };
1168        if self.config.is_exactly_once.unwrap_or_default() {
1169            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1170        } else {
1171            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1172        }
1173    }
1174}
1175
1176/// None means no project.
1177/// Prepare represent the extra partition column idx.
1178/// Done represents the project idx vec.
1179///
1180/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
1181/// to create the project idx vec.
1182enum ProjectIdxVec {
1183    None,
1184    Prepare(usize),
1185    Done(Vec<usize>),
1186}
1187
1188type DataFileWriterBuilderType =
1189    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1190type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1191    ParquetWriterBuilder,
1192    DefaultLocationGenerator,
1193    DefaultFileNameGenerator,
1194>;
1195type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1196    ParquetWriterBuilder,
1197    DefaultLocationGenerator,
1198    DefaultFileNameGenerator,
1199>;
1200type DeletionVectorWriterBuilderType =
1201    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1202type DeletionVectorWriterType =
1203    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1204type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1205    ParquetWriterBuilder,
1206    DefaultLocationGenerator,
1207    DefaultFileNameGenerator,
1208>;
1209
1210#[derive(Clone)]
1211enum PositionDeleteWriterBuilderType {
1212    PositionDelete(PositionDeleteFileWriterBuilderType),
1213    DeletionVector(DeletionVectorWriterBuilderType),
1214}
1215
1216enum PositionDeleteWriterType {
1217    PositionDelete(PositionDeleteFileWriterType),
1218    DeletionVector(DeletionVectorWriterType),
1219}
1220
1221#[async_trait]
1222impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1223    type R = PositionDeleteWriterType;
1224
1225    async fn build(
1226        &self,
1227        partition_key: Option<iceberg::spec::PartitionKey>,
1228    ) -> iceberg::Result<Self::R> {
1229        match self {
1230            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1231                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1232            ),
1233            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1234                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1235            ),
1236        }
1237    }
1238}
1239
1240#[async_trait]
1241impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1242    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1243        match self {
1244            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1245            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1246        }
1247    }
1248
1249    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1250        match self {
1251            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1252            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1253        }
1254    }
1255}
1256
1257#[derive(Clone)]
1258struct SharedIcebergWriterBuilder<B>(Arc<B>);
1259
1260#[async_trait]
1261impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1262    type R = B::R;
1263
1264    async fn build(
1265        &self,
1266        partition_key: Option<iceberg::spec::PartitionKey>,
1267    ) -> iceberg::Result<Self::R> {
1268        self.0.build(partition_key).await
1269    }
1270}
1271
1272#[derive(Clone)]
1273struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1274    inner: Arc<B>,
1275    fanout_enabled: bool,
1276    schema: IcebergSchemaRef,
1277    partition_spec: PartitionSpecRef,
1278    compute_partition: bool,
1279}
1280
1281impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1282    fn new(
1283        inner: B,
1284        fanout_enabled: bool,
1285        schema: IcebergSchemaRef,
1286        partition_spec: PartitionSpecRef,
1287        compute_partition: bool,
1288    ) -> Self {
1289        Self {
1290            inner: Arc::new(inner),
1291            fanout_enabled,
1292            schema,
1293            partition_spec,
1294            compute_partition,
1295        }
1296    }
1297
1298    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1299        let partition_splitter = match (
1300            self.partition_spec.is_unpartitioned(),
1301            self.compute_partition,
1302        ) {
1303            (true, _) => None,
1304            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1305                self.schema.clone(),
1306                self.partition_spec.clone(),
1307            )?),
1308            (false, false) => Some(
1309                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1310                    self.schema.clone(),
1311                    self.partition_spec.clone(),
1312                )?,
1313            ),
1314        };
1315
1316        Ok(TaskWriter::new_with_partition_splitter(
1317            SharedIcebergWriterBuilder(self.inner.clone()),
1318            self.fanout_enabled,
1319            self.schema.clone(),
1320            self.partition_spec.clone(),
1321            partition_splitter,
1322        ))
1323    }
1324}
1325
1326pub enum IcebergSinkWriter {
1327    Created(IcebergSinkWriterArgs),
1328    Initialized(IcebergSinkWriterInner),
1329}
1330
1331pub struct IcebergSinkWriterArgs {
1332    config: IcebergConfig,
1333    sink_param: SinkParam,
1334    writer_param: SinkWriterParam,
1335    unique_column_ids: Option<Vec<usize>>,
1336}
1337
1338pub struct IcebergSinkWriterInner {
1339    writer: IcebergWriterDispatch,
1340    arrow_schema: SchemaRef,
1341    // See comments below
1342    metrics: IcebergWriterMetrics,
1343    // State of iceberg table for this writer
1344    table: Table,
1345    // For chunk with extra partition column, we should remove this column before write.
1346    // This project index vec is used to avoid create project idx each time.
1347    project_idx_vec: ProjectIdxVec,
1348    commit_checkpoint_size_threshold_bytes: Option<u64>,
1349    uncommitted_write_bytes: u64,
1350}
1351
1352#[allow(clippy::type_complexity)]
1353enum IcebergWriterDispatch {
1354    Append {
1355        writer: Option<Box<dyn IcebergWriter>>,
1356        writer_builder:
1357            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1358    },
1359    Upsert {
1360        writer: Option<Box<dyn IcebergWriter>>,
1361        writer_builder: TaskWriterBuilderWrapper<
1362            MonitoredGeneralWriterBuilder<
1363                DeltaWriterBuilder<
1364                    DataFileWriterBuilderType,
1365                    PositionDeleteWriterBuilderType,
1366                    EqualityDeleteFileWriterBuilderType,
1367                >,
1368            >,
1369        >,
1370        arrow_schema_with_op_column: SchemaRef,
1371    },
1372}
1373
1374impl IcebergWriterDispatch {
1375    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1376        match self {
1377            IcebergWriterDispatch::Append { writer, .. }
1378            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1379        }
1380    }
1381}
1382
1383pub struct IcebergWriterMetrics {
1384    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1385    // They are actually used in `PrometheusWriterBuilder`:
1386    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1387    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1388    _write_qps: LabelGuardedIntCounter,
1389    _write_latency: LabelGuardedHistogram,
1390    write_bytes: LabelGuardedIntCounter,
1391}
1392
1393impl IcebergSinkWriter {
1394    pub fn new(
1395        config: IcebergConfig,
1396        sink_param: SinkParam,
1397        writer_param: SinkWriterParam,
1398        unique_column_ids: Option<Vec<usize>>,
1399    ) -> Self {
1400        Self::Created(IcebergSinkWriterArgs {
1401            config,
1402            sink_param,
1403            writer_param,
1404            unique_column_ids,
1405        })
1406    }
1407}
1408
1409impl IcebergSinkWriterInner {
1410    fn should_commit_on_checkpoint(&self) -> bool {
1411        self.commit_checkpoint_size_threshold_bytes
1412            .is_some_and(|threshold| {
1413                self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
1414            })
1415    }
1416
1417    fn build_append_only(
1418        config: &IcebergConfig,
1419        table: Table,
1420        writer_param: &SinkWriterParam,
1421    ) -> Result<Self> {
1422        let SinkWriterParam {
1423            extra_partition_col_idx,
1424            actor_id,
1425            sink_id,
1426            sink_name,
1427            ..
1428        } = writer_param;
1429        let metrics_labels = [
1430            &actor_id.to_string(),
1431            &sink_id.to_string(),
1432            sink_name.as_str(),
1433        ];
1434
1435        // Metrics
1436        let write_qps = GLOBAL_SINK_METRICS
1437            .iceberg_write_qps
1438            .with_guarded_label_values(&metrics_labels);
1439        let write_latency = GLOBAL_SINK_METRICS
1440            .iceberg_write_latency
1441            .with_guarded_label_values(&metrics_labels);
1442        // # TODO
1443        // Unused. Add this metrics later.
1444        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1445            .iceberg_rolling_unflushed_data_file
1446            .with_guarded_label_values(&metrics_labels);
1447        let write_bytes = GLOBAL_SINK_METRICS
1448            .iceberg_write_bytes
1449            .with_guarded_label_values(&metrics_labels);
1450
1451        let schema = table.metadata().current_schema();
1452        let partition_spec = table.metadata().default_partition_spec();
1453        let fanout_enabled = !partition_spec.fields().is_empty();
1454        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1455        let unique_uuid_suffix = Uuid::now_v7();
1456
1457        let parquet_writer_properties = WriterProperties::builder()
1458            .set_compression(config.get_parquet_compression())
1459            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1460            .set_created_by(PARQUET_CREATED_BY.to_owned())
1461            .build();
1462
1463        let parquet_writer_builder =
1464            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1465        let rolling_builder = RollingFileWriterBuilder::new(
1466            parquet_writer_builder,
1467            (config.target_file_size_mb() * 1024 * 1024) as usize,
1468            table.file_io().clone(),
1469            DefaultLocationGenerator::new(table.metadata().clone())
1470                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1471            DefaultFileNameGenerator::new(
1472                writer_param.actor_id.to_string(),
1473                Some(unique_uuid_suffix.to_string()),
1474                iceberg::spec::DataFileFormat::Parquet,
1475            ),
1476        );
1477        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1478        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1479            data_file_builder,
1480            write_qps.clone(),
1481            write_latency.clone(),
1482        );
1483        let writer_builder = TaskWriterBuilderWrapper::new(
1484            monitored_builder,
1485            fanout_enabled,
1486            schema.clone(),
1487            partition_spec.clone(),
1488            true,
1489        );
1490        let inner_writer = Some(Box::new(
1491            writer_builder
1492                .build()
1493                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1494        ) as Box<dyn IcebergWriter>);
1495        Ok(Self {
1496            arrow_schema: Arc::new(
1497                schema_to_arrow_schema(table.metadata().current_schema())
1498                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1499            ),
1500            metrics: IcebergWriterMetrics {
1501                _write_qps: write_qps,
1502                _write_latency: write_latency,
1503                write_bytes,
1504            },
1505            writer: IcebergWriterDispatch::Append {
1506                writer: inner_writer,
1507                writer_builder,
1508            },
1509            table,
1510            project_idx_vec: {
1511                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1512                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1513                } else {
1514                    ProjectIdxVec::None
1515                }
1516            },
1517            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1518            uncommitted_write_bytes: 0,
1519        })
1520    }
1521
1522    fn build_upsert(
1523        config: &IcebergConfig,
1524        table: Table,
1525        unique_column_ids: Vec<usize>,
1526        writer_param: &SinkWriterParam,
1527    ) -> Result<Self> {
1528        let SinkWriterParam {
1529            extra_partition_col_idx,
1530            actor_id,
1531            sink_id,
1532            sink_name,
1533            ..
1534        } = writer_param;
1535        let metrics_labels = [
1536            &actor_id.to_string(),
1537            &sink_id.to_string(),
1538            sink_name.as_str(),
1539        ];
1540        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1541
1542        // Metrics
1543        let write_qps = GLOBAL_SINK_METRICS
1544            .iceberg_write_qps
1545            .with_guarded_label_values(&metrics_labels);
1546        let write_latency = GLOBAL_SINK_METRICS
1547            .iceberg_write_latency
1548            .with_guarded_label_values(&metrics_labels);
1549        // # TODO
1550        // Unused. Add this metrics later.
1551        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1552            .iceberg_rolling_unflushed_data_file
1553            .with_guarded_label_values(&metrics_labels);
1554        let write_bytes = GLOBAL_SINK_METRICS
1555            .iceberg_write_bytes
1556            .with_guarded_label_values(&metrics_labels);
1557
1558        // Determine the schema id and partition spec id
1559        let schema = table.metadata().current_schema();
1560        let partition_spec = table.metadata().default_partition_spec();
1561        let fanout_enabled = !partition_spec.fields().is_empty();
1562        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1563
1564        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1565        let unique_uuid_suffix = Uuid::now_v7();
1566
1567        let parquet_writer_properties = WriterProperties::builder()
1568            .set_compression(config.get_parquet_compression())
1569            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1570            .set_created_by(PARQUET_CREATED_BY.to_owned())
1571            .build();
1572
1573        let data_file_builder = {
1574            let parquet_writer_builder =
1575                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1576            let rolling_writer_builder = RollingFileWriterBuilder::new(
1577                parquet_writer_builder,
1578                (config.target_file_size_mb() * 1024 * 1024) as usize,
1579                table.file_io().clone(),
1580                DefaultLocationGenerator::new(table.metadata().clone())
1581                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1582                DefaultFileNameGenerator::new(
1583                    writer_param.actor_id.to_string(),
1584                    Some(unique_uuid_suffix.to_string()),
1585                    iceberg::spec::DataFileFormat::Parquet,
1586                ),
1587            );
1588            DataFileWriterBuilder::new(rolling_writer_builder)
1589        };
1590        let position_delete_builder = if use_deletion_vectors {
1591            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1592                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1593            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1594                table.file_io().clone(),
1595                location_generator,
1596                DefaultFileNameGenerator::new(
1597                    writer_param.actor_id.to_string(),
1598                    Some(format!("delvec-{}", unique_uuid_suffix)),
1599                    iceberg::spec::DataFileFormat::Puffin,
1600                ),
1601            ))
1602        } else {
1603            let parquet_writer_builder = ParquetWriterBuilder::new(
1604                parquet_writer_properties.clone(),
1605                POSITION_DELETE_SCHEMA.clone().into(),
1606            );
1607            let rolling_writer_builder = RollingFileWriterBuilder::new(
1608                parquet_writer_builder,
1609                (config.target_file_size_mb() * 1024 * 1024) as usize,
1610                table.file_io().clone(),
1611                DefaultLocationGenerator::new(table.metadata().clone())
1612                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1613                DefaultFileNameGenerator::new(
1614                    writer_param.actor_id.to_string(),
1615                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1616                    iceberg::spec::DataFileFormat::Parquet,
1617                ),
1618            );
1619            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1620                rolling_writer_builder,
1621            ))
1622        };
1623        let equality_delete_builder = {
1624            let eq_del_config = EqualityDeleteWriterConfig::new(
1625                unique_column_ids.clone(),
1626                table.metadata().current_schema().clone(),
1627            )
1628            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1629            let parquet_writer_builder = ParquetWriterBuilder::new(
1630                parquet_writer_properties,
1631                Arc::new(
1632                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1633                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1634                ),
1635            );
1636            let rolling_writer_builder = RollingFileWriterBuilder::new(
1637                parquet_writer_builder,
1638                (config.target_file_size_mb() * 1024 * 1024) as usize,
1639                table.file_io().clone(),
1640                DefaultLocationGenerator::new(table.metadata().clone())
1641                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1642                DefaultFileNameGenerator::new(
1643                    writer_param.actor_id.to_string(),
1644                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1645                    iceberg::spec::DataFileFormat::Parquet,
1646                ),
1647            );
1648
1649            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1650        };
1651        let delta_builder = DeltaWriterBuilder::new(
1652            data_file_builder,
1653            position_delete_builder,
1654            equality_delete_builder,
1655            unique_column_ids,
1656            schema.clone(),
1657        );
1658        let original_arrow_schema = Arc::new(
1659            schema_to_arrow_schema(table.metadata().current_schema())
1660                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1661        );
1662        let schema_with_extra_op_column = {
1663            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1664            new_fields.push(Arc::new(ArrowField::new(
1665                "op".to_owned(),
1666                ArrowDataType::Int32,
1667                false,
1668            )));
1669            Arc::new(ArrowSchema::new(new_fields))
1670        };
1671        let writer_builder = TaskWriterBuilderWrapper::new(
1672            MonitoredGeneralWriterBuilder::new(
1673                delta_builder,
1674                write_qps.clone(),
1675                write_latency.clone(),
1676            ),
1677            fanout_enabled,
1678            schema.clone(),
1679            partition_spec.clone(),
1680            true,
1681        );
1682        let inner_writer = Some(Box::new(
1683            writer_builder
1684                .build()
1685                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1686        ) as Box<dyn IcebergWriter>);
1687        Ok(Self {
1688            arrow_schema: original_arrow_schema,
1689            metrics: IcebergWriterMetrics {
1690                _write_qps: write_qps,
1691                _write_latency: write_latency,
1692                write_bytes,
1693            },
1694            table,
1695            writer: IcebergWriterDispatch::Upsert {
1696                writer: inner_writer,
1697                writer_builder,
1698                arrow_schema_with_op_column: schema_with_extra_op_column,
1699            },
1700            project_idx_vec: {
1701                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1702                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1703                } else {
1704                    ProjectIdxVec::None
1705                }
1706            },
1707            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1708            uncommitted_write_bytes: 0,
1709        })
1710    }
1711}
1712
1713#[async_trait]
1714impl SinkWriter for IcebergSinkWriter {
1715    type CommitMetadata = Option<SinkMetadata>;
1716
1717    /// Begin a new epoch
1718    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1719        let Self::Created(args) = self else {
1720            return Ok(());
1721        };
1722
1723        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1724        let inner = match &args.unique_column_ids {
1725            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1726                &args.config,
1727                table,
1728                unique_column_ids.clone(),
1729                &args.writer_param,
1730            )?,
1731            None => {
1732                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1733            }
1734        };
1735
1736        *self = IcebergSinkWriter::Initialized(inner);
1737        Ok(())
1738    }
1739
1740    /// Write a stream chunk to sink
1741    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1742        let Self::Initialized(inner) = self else {
1743            unreachable!("IcebergSinkWriter should be initialized before barrier");
1744        };
1745
1746        // Try to build writer if it's None.
1747        match &mut inner.writer {
1748            IcebergWriterDispatch::Append {
1749                writer,
1750                writer_builder,
1751            } => {
1752                if writer.is_none() {
1753                    *writer = Some(Box::new(
1754                        writer_builder
1755                            .build()
1756                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1757                    ));
1758                }
1759            }
1760            IcebergWriterDispatch::Upsert {
1761                writer,
1762                writer_builder,
1763                ..
1764            } => {
1765                if writer.is_none() {
1766                    *writer = Some(Box::new(
1767                        writer_builder
1768                            .build()
1769                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1770                    ));
1771                }
1772            }
1773        };
1774
1775        // Process the chunk.
1776        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1777        match &mut inner.project_idx_vec {
1778            ProjectIdxVec::None => {}
1779            ProjectIdxVec::Prepare(idx) => {
1780                if *idx >= chunk.columns().len() {
1781                    return Err(SinkError::Iceberg(anyhow!(
1782                        "invalid extra partition column index {}",
1783                        idx
1784                    )));
1785                }
1786                let project_idx_vec = (0..*idx)
1787                    .chain(*idx + 1..chunk.columns().len())
1788                    .collect_vec();
1789                chunk = chunk.project(&project_idx_vec);
1790                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1791            }
1792            ProjectIdxVec::Done(idx_vec) => {
1793                chunk = chunk.project(idx_vec);
1794            }
1795        }
1796        if ops.is_empty() {
1797            return Ok(());
1798        }
1799        let write_batch_size = chunk.estimated_heap_size();
1800        let batch = match &inner.writer {
1801            IcebergWriterDispatch::Append { .. } => {
1802                // separate out insert chunk
1803                let filters =
1804                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1805                chunk.set_visibility(filters);
1806                IcebergArrowConvert
1807                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1808                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1809            }
1810            IcebergWriterDispatch::Upsert {
1811                arrow_schema_with_op_column,
1812                ..
1813            } => {
1814                let chunk = IcebergArrowConvert
1815                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1816                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1817                let ops = Arc::new(Int32Array::from(
1818                    ops.iter()
1819                        .map(|op| match op {
1820                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1821                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1822                        })
1823                        .collect_vec(),
1824                ));
1825                let mut columns = chunk.columns().to_vec();
1826                columns.push(ops);
1827                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1828                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1829            }
1830        };
1831
1832        let writer = inner.writer.get_writer().unwrap();
1833        writer
1834            .write(batch)
1835            .instrument_await("iceberg_write")
1836            .await
1837            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1838        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1839        inner.uncommitted_write_bytes = inner
1840            .uncommitted_write_bytes
1841            .saturating_add(write_batch_size as u64);
1842        Ok(())
1843    }
1844
1845    fn should_commit_on_checkpoint(&self) -> bool {
1846        match self {
1847            Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
1848            Self::Created(_) => false,
1849        }
1850    }
1851
1852    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1853    /// writer should commit the current epoch.
1854    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1855        let Self::Initialized(inner) = self else {
1856            unreachable!("IcebergSinkWriter should be initialized before barrier");
1857        };
1858
1859        // Skip it if not checkpoint
1860        if !is_checkpoint {
1861            return Ok(None);
1862        }
1863
1864        let close_result = match &mut inner.writer {
1865            IcebergWriterDispatch::Append {
1866                writer,
1867                writer_builder,
1868            } => {
1869                let close_result = match writer.take() {
1870                    Some(mut writer) => {
1871                        Some(writer.close().instrument_await("iceberg_close").await)
1872                    }
1873                    _ => None,
1874                };
1875                match writer_builder.build() {
1876                    Ok(new_writer) => {
1877                        *writer = Some(Box::new(new_writer));
1878                    }
1879                    _ => {
1880                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1881                        // here because current writer may close successfully. So we just log the error.
1882                        warn!("Failed to build new writer after close");
1883                    }
1884                }
1885                close_result
1886            }
1887            IcebergWriterDispatch::Upsert {
1888                writer,
1889                writer_builder,
1890                ..
1891            } => {
1892                let close_result = match writer.take() {
1893                    Some(mut writer) => {
1894                        Some(writer.close().instrument_await("iceberg_close").await)
1895                    }
1896                    _ => None,
1897                };
1898                match writer_builder.build() {
1899                    Ok(new_writer) => {
1900                        *writer = Some(Box::new(new_writer));
1901                    }
1902                    _ => {
1903                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1904                        // here because current writer may close successfully. So we just log the error.
1905                        warn!("Failed to build new writer after close");
1906                    }
1907                }
1908                close_result
1909            }
1910        };
1911
1912        match close_result {
1913            Some(Ok(result)) => {
1914                inner.uncommitted_write_bytes = 0;
1915                let format_version = inner.table.metadata().format_version();
1916                let partition_type = inner.table.metadata().default_partition_type();
1917                let data_files = result
1918                    .into_iter()
1919                    .map(|f| {
1920                        // Truncate large column statistics BEFORE serialization
1921                        let truncated = truncate_datafile(f);
1922                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1923                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1924                    })
1925                    .collect::<Result<Vec<_>>>()?;
1926                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1927                    data_files,
1928                    schema_id: inner.table.metadata().current_schema_id(),
1929                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1930                })?))
1931            }
1932            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1933            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1934        }
1935    }
1936}
1937
1938const SCHEMA_ID: &str = "schema_id";
1939const PARTITION_SPEC_ID: &str = "partition_spec_id";
1940const DATA_FILES: &str = "data_files";
1941
1942/// Maximum size for column statistics (min/max values) in bytes.
1943/// Column statistics larger than this will be truncated to avoid metadata bloat.
1944/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1945///
1946/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1947/// We truncate at the `DataFile` level (before serialization) by directly modifying
1948/// the public `lower_bounds` and `upper_bounds` fields.
1949///
1950/// This prevents metadata from ballooning to gigabytes when dealing with large
1951/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1952/// that benefit from query optimization.
1953const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1954
1955/// Truncate large column statistics from `DataFile` BEFORE serialization.
1956///
1957/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1958/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1959///
1960/// # Arguments
1961/// * `data_file` - A `DataFile` to process
1962///
1963/// # Returns
1964/// The modified `DataFile` with large statistics truncated
1965fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1966    // Process lower_bounds - remove entries with large values
1967    data_file.lower_bounds.retain(|field_id, datum| {
1968        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1969        let size = match datum.to_bytes() {
1970            Ok(bytes) => bytes.len(),
1971            Err(_) => 0,
1972        };
1973
1974        if size > MAX_COLUMN_STAT_SIZE {
1975            tracing::debug!(
1976                field_id = field_id,
1977                size = size,
1978                "Truncating large lower_bound statistic"
1979            );
1980            return false;
1981        }
1982        true
1983    });
1984
1985    // Process upper_bounds - remove entries with large values
1986    data_file.upper_bounds.retain(|field_id, datum| {
1987        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1988        let size = match datum.to_bytes() {
1989            Ok(bytes) => bytes.len(),
1990            Err(_) => 0,
1991        };
1992
1993        if size > MAX_COLUMN_STAT_SIZE {
1994            tracing::debug!(
1995                field_id = field_id,
1996                size = size,
1997                "Truncating large upper_bound statistic"
1998            );
1999            return false;
2000        }
2001        true
2002    });
2003
2004    data_file
2005}
2006
2007#[derive(Default, Clone)]
2008struct IcebergCommitResult {
2009    schema_id: i32,
2010    partition_spec_id: i32,
2011    data_files: Vec<SerializedDataFile>,
2012}
2013
2014impl IcebergCommitResult {
2015    fn try_from(value: &SinkMetadata) -> Result<Self> {
2016        if let Some(Serialized(v)) = &value.metadata {
2017            let mut values = if let serde_json::Value::Object(v) =
2018                serde_json::from_slice::<serde_json::Value>(&v.metadata)
2019                    .context("Can't parse iceberg sink metadata")?
2020            {
2021                v
2022            } else {
2023                bail!("iceberg sink metadata should be an object");
2024            };
2025
2026            let schema_id;
2027            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2028                schema_id = value
2029                    .as_u64()
2030                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2031            } else {
2032                bail!("iceberg sink metadata should have schema_id");
2033            }
2034
2035            let partition_spec_id;
2036            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2037                partition_spec_id = value
2038                    .as_u64()
2039                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2040            } else {
2041                bail!("iceberg sink metadata should have partition_spec_id");
2042            }
2043
2044            let data_files: Vec<SerializedDataFile>;
2045            if let serde_json::Value::Array(values) = values
2046                .remove(DATA_FILES)
2047                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2048            {
2049                data_files = values
2050                    .into_iter()
2051                    .map(from_value::<SerializedDataFile>)
2052                    .collect::<std::result::Result<_, _>>()
2053                    .unwrap();
2054            } else {
2055                bail!("iceberg sink metadata should have data_files object");
2056            }
2057
2058            Ok(Self {
2059                schema_id: schema_id as i32,
2060                partition_spec_id: partition_spec_id as i32,
2061                data_files,
2062            })
2063        } else {
2064            bail!("Can't create iceberg sink write result from empty data!")
2065        }
2066    }
2067
2068    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2069        let mut values = if let serde_json::Value::Object(value) =
2070            serde_json::from_slice::<serde_json::Value>(&value)
2071                .context("Can't parse iceberg sink metadata")?
2072        {
2073            value
2074        } else {
2075            bail!("iceberg sink metadata should be an object");
2076        };
2077
2078        let schema_id;
2079        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2080            schema_id = value
2081                .as_u64()
2082                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2083        } else {
2084            bail!("iceberg sink metadata should have schema_id");
2085        }
2086
2087        let partition_spec_id;
2088        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2089            partition_spec_id = value
2090                .as_u64()
2091                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2092        } else {
2093            bail!("iceberg sink metadata should have partition_spec_id");
2094        }
2095
2096        let data_files: Vec<SerializedDataFile>;
2097        if let serde_json::Value::Array(values) = values
2098            .remove(DATA_FILES)
2099            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2100        {
2101            data_files = values
2102                .into_iter()
2103                .map(from_value::<SerializedDataFile>)
2104                .collect::<std::result::Result<_, _>>()
2105                .unwrap();
2106        } else {
2107            bail!("iceberg sink metadata should have data_files object");
2108        }
2109
2110        Ok(Self {
2111            schema_id: schema_id as i32,
2112            partition_spec_id: partition_spec_id as i32,
2113            data_files,
2114        })
2115    }
2116}
2117
2118impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2119    type Error = SinkError;
2120
2121    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2122        let json_data_files = serde_json::Value::Array(
2123            value
2124                .data_files
2125                .iter()
2126                .map(serde_json::to_value)
2127                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2128                .context("Can't serialize data files to json")?,
2129        );
2130        let json_value = serde_json::Value::Object(
2131            vec![
2132                (
2133                    SCHEMA_ID.to_owned(),
2134                    serde_json::Value::Number(value.schema_id.into()),
2135                ),
2136                (
2137                    PARTITION_SPEC_ID.to_owned(),
2138                    serde_json::Value::Number(value.partition_spec_id.into()),
2139                ),
2140                (DATA_FILES.to_owned(), json_data_files),
2141            ]
2142            .into_iter()
2143            .collect(),
2144        );
2145        Ok(SinkMetadata {
2146            metadata: Some(Serialized(SerializedMetadata {
2147                metadata: serde_json::to_vec(&json_value)
2148                    .context("Can't serialize iceberg sink metadata")?,
2149            })),
2150        })
2151    }
2152}
2153
2154impl TryFrom<IcebergCommitResult> for Vec<u8> {
2155    type Error = SinkError;
2156
2157    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2158        let json_data_files = serde_json::Value::Array(
2159            value
2160                .data_files
2161                .iter()
2162                .map(serde_json::to_value)
2163                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2164                .context("Can't serialize data files to json")?,
2165        );
2166        let json_value = serde_json::Value::Object(
2167            vec![
2168                (
2169                    SCHEMA_ID.to_owned(),
2170                    serde_json::Value::Number(value.schema_id.into()),
2171                ),
2172                (
2173                    PARTITION_SPEC_ID.to_owned(),
2174                    serde_json::Value::Number(value.partition_spec_id.into()),
2175                ),
2176                (DATA_FILES.to_owned(), json_data_files),
2177            ]
2178            .into_iter()
2179            .collect(),
2180        );
2181        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2182    }
2183}
2184pub struct IcebergSinkCommitter {
2185    catalog: Arc<dyn Catalog>,
2186    table: Table,
2187    pub last_commit_epoch: u64,
2188    pub(crate) sink_id: SinkId,
2189    pub(crate) config: IcebergConfig,
2190    pub(crate) param: SinkParam,
2191    commit_retry_num: u32,
2192    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2193}
2194
2195impl IcebergSinkCommitter {
2196    // Reload table and guarantee current schema_id and partition_spec_id matches
2197    // given `schema_id` and `partition_spec_id`
2198    async fn reload_table(
2199        catalog: &dyn Catalog,
2200        table_ident: &TableIdent,
2201        schema_id: i32,
2202        partition_spec_id: i32,
2203    ) -> Result<Table> {
2204        let table = catalog
2205            .load_table(table_ident)
2206            .await
2207            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2208        if table.metadata().current_schema_id() != schema_id {
2209            return Err(SinkError::Iceberg(anyhow!(
2210                "Schema evolution not supported, expect schema id {}, but got {}",
2211                schema_id,
2212                table.metadata().current_schema_id()
2213            )));
2214        }
2215        if table.metadata().default_partition_spec_id() != partition_spec_id {
2216            return Err(SinkError::Iceberg(anyhow!(
2217                "Partition evolution not supported, expect partition spec id {}, but got {}",
2218                partition_spec_id,
2219                table.metadata().default_partition_spec_id()
2220            )));
2221        }
2222        Ok(table)
2223    }
2224}
2225
2226#[async_trait]
2227impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2228    async fn init(&mut self) -> Result<()> {
2229        tracing::info!(
2230            sink_id = %self.param.sink_id,
2231            "Iceberg sink coordinator initialized",
2232        );
2233
2234        Ok(())
2235    }
2236
2237    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2238        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2239
2240        if metadata.is_empty() {
2241            tracing::debug!(?epoch, "No datafile to commit");
2242            return Ok(());
2243        }
2244
2245        // Commit data if present
2246        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2247            self.commit_data_impl(epoch, write_results, snapshot_id)
2248                .await?;
2249        }
2250
2251        Ok(())
2252    }
2253
2254    async fn commit_schema_change(
2255        &mut self,
2256        epoch: u64,
2257        schema_change: PbSinkSchemaChange,
2258    ) -> Result<()> {
2259        tracing::info!(
2260            "Committing schema change {:?} in epoch {}",
2261            schema_change,
2262            epoch
2263        );
2264        self.commit_schema_change_impl(schema_change).await?;
2265        tracing::info!("Successfully committed schema change in epoch {}", epoch);
2266
2267        Ok(())
2268    }
2269}
2270
2271#[async_trait]
2272impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2273    async fn init(&mut self) -> Result<()> {
2274        tracing::info!(
2275            sink_id = %self.param.sink_id,
2276            "Iceberg sink coordinator initialized",
2277        );
2278
2279        Ok(())
2280    }
2281
2282    async fn pre_commit(
2283        &mut self,
2284        epoch: u64,
2285        metadata: Vec<SinkMetadata>,
2286        _schema_change: Option<PbSinkSchemaChange>,
2287    ) -> Result<Option<Vec<u8>>> {
2288        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2289
2290        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2291            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2292            None => {
2293                tracing::debug!(?epoch, "no data to pre commit");
2294                return Ok(None);
2295            }
2296        };
2297
2298        let mut write_results_bytes = Vec::new();
2299        for each_parallelism_write_result in write_results {
2300            let each_parallelism_write_result_bytes: Vec<u8> =
2301                each_parallelism_write_result.try_into()?;
2302            write_results_bytes.push(each_parallelism_write_result_bytes);
2303        }
2304
2305        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2306        write_results_bytes.push(snapshot_id_bytes);
2307
2308        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2309        Ok(Some(pre_commit_metadata_bytes))
2310    }
2311
2312    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2313        tracing::debug!("Starting iceberg commit in epoch {epoch}");
2314
2315        if commit_metadata.is_empty() {
2316            tracing::debug!(?epoch, "No datafile to commit");
2317            return Ok(());
2318        }
2319
2320        // Deserialize commit metadata
2321        let mut payload = deserialize_metadata(commit_metadata);
2322        if payload.is_empty() {
2323            return Err(SinkError::Iceberg(anyhow!(
2324                "Invalid commit metadata: empty payload"
2325            )));
2326        }
2327
2328        // Last element is snapshot_id
2329        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2330            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2331        })?;
2332        let snapshot_id = i64::from_le_bytes(
2333            snapshot_id_bytes
2334                .try_into()
2335                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2336        );
2337
2338        // Remaining elements are write_results
2339        let write_results = payload
2340            .into_iter()
2341            .map(IcebergCommitResult::try_from_serialized_bytes)
2342            .collect::<Result<Vec<_>>>()?;
2343
2344        let snapshot_committed = self
2345            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2346            .await?;
2347
2348        if snapshot_committed {
2349            tracing::info!(
2350                "Snapshot id {} already committed in iceberg table, skip committing again.",
2351                snapshot_id
2352            );
2353            return Ok(());
2354        }
2355
2356        self.commit_data_impl(epoch, write_results, snapshot_id)
2357            .await
2358    }
2359
2360    async fn commit_schema_change(
2361        &mut self,
2362        epoch: u64,
2363        schema_change: PbSinkSchemaChange,
2364    ) -> Result<()> {
2365        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2366        if schema_updated {
2367            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2368            return Ok(());
2369        }
2370
2371        tracing::info!(
2372            "Committing schema change {:?} in epoch {}",
2373            schema_change,
2374            epoch
2375        );
2376        self.commit_schema_change_impl(schema_change).await?;
2377        tracing::info!("Successfully committed schema change in epoch {epoch}");
2378
2379        Ok(())
2380    }
2381
2382    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2383        // TODO: Files that have been written but not committed should be deleted.
2384        tracing::debug!("Abort not implemented yet");
2385    }
2386}
2387
2388/// Methods Required to Achieve Exactly Once Semantics
2389impl IcebergSinkCommitter {
2390    fn pre_commit_inner(
2391        &mut self,
2392        _epoch: u64,
2393        metadata: Vec<SinkMetadata>,
2394    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2395        let write_results: Vec<IcebergCommitResult> = metadata
2396            .iter()
2397            .map(IcebergCommitResult::try_from)
2398            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2399
2400        // Skip if no data to commit
2401        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2402            return Ok(None);
2403        }
2404
2405        let expect_schema_id = write_results[0].schema_id;
2406        let expect_partition_spec_id = write_results[0].partition_spec_id;
2407
2408        // guarantee that all write results has same schema_id and partition_spec_id
2409        if write_results
2410            .iter()
2411            .any(|r| r.schema_id != expect_schema_id)
2412            || write_results
2413                .iter()
2414                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2415        {
2416            return Err(SinkError::Iceberg(anyhow!(
2417                "schema_id and partition_spec_id should be the same in all write results"
2418            )));
2419        }
2420
2421        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2422
2423        Ok(Some((write_results, snapshot_id)))
2424    }
2425
2426    async fn commit_data_impl(
2427        &mut self,
2428        epoch: u64,
2429        write_results: Vec<IcebergCommitResult>,
2430        snapshot_id: i64,
2431    ) -> Result<()> {
2432        // Empty write results should be handled before calling this function.
2433        assert!(
2434            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2435        );
2436
2437        // Check snapshot limit before proceeding with commit
2438        self.wait_for_snapshot_limit().await?;
2439
2440        let expect_schema_id = write_results[0].schema_id;
2441        let expect_partition_spec_id = write_results[0].partition_spec_id;
2442
2443        // Load the latest table to avoid concurrent modification with the best effort.
2444        self.table = Self::reload_table(
2445            self.catalog.as_ref(),
2446            self.table.identifier(),
2447            expect_schema_id,
2448            expect_partition_spec_id,
2449        )
2450        .await?;
2451
2452        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2453            return Err(SinkError::Iceberg(anyhow!(
2454                "Can't find schema by id {}",
2455                expect_schema_id
2456            )));
2457        };
2458        let Some(partition_spec) = self
2459            .table
2460            .metadata()
2461            .partition_spec_by_id(expect_partition_spec_id)
2462        else {
2463            return Err(SinkError::Iceberg(anyhow!(
2464                "Can't find partition spec by id {}",
2465                expect_partition_spec_id
2466            )));
2467        };
2468        let partition_type = partition_spec
2469            .as_ref()
2470            .clone()
2471            .partition_type(schema)
2472            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2473
2474        let data_files = write_results
2475            .into_iter()
2476            .flat_map(|r| {
2477                r.data_files.into_iter().map(|f| {
2478                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2479                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2480                })
2481            })
2482            .collect::<Result<Vec<DataFile>>>()?;
2483        // # TODO:
2484        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2485        // because retry logic involved reapply the commit metadata.
2486        // For now, we just retry the commit operation.
2487        let retry_strategy = ExponentialBackoff::from_millis(10)
2488            .max_delay(Duration::from_secs(60))
2489            .map(jitter)
2490            .take(self.commit_retry_num as usize);
2491        let catalog = self.catalog.clone();
2492        let table_ident = self.table.identifier().clone();
2493
2494        // Custom retry logic that:
2495        // 1. Calls reload_table before each commit attempt to get the latest metadata
2496        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2497        // 3. If commit fails, retries with backoff
2498        enum CommitError {
2499            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2500            Commit(SinkError),      // Retriable: commit conflicts, network errors
2501        }
2502
2503        let table = RetryIf::spawn(
2504            retry_strategy,
2505            || async {
2506                // Reload table before each commit attempt to get the latest metadata
2507                let table = Self::reload_table(
2508                    catalog.as_ref(),
2509                    &table_ident,
2510                    expect_schema_id,
2511                    expect_partition_spec_id,
2512                )
2513                .await
2514                .map_err(|e| {
2515                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2516                    CommitError::ReloadTable(e)
2517                })?;
2518
2519                let txn = Transaction::new(&table);
2520                let append_action = txn
2521                    .fast_append()
2522                    .set_snapshot_id(snapshot_id)
2523                    .set_target_branch(commit_branch(
2524                        self.config.r#type.as_str(),
2525                        self.config.write_mode,
2526                    ))
2527                    .add_data_files(data_files.clone());
2528
2529                let tx = append_action.apply(txn).map_err(|err| {
2530                    let err: IcebergError = err.into();
2531                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2532                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2533                })?;
2534
2535                tx.commit(catalog.as_ref()).await.map_err(|err| {
2536                    let err: IcebergError = err.into();
2537                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2538                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2539                })
2540            },
2541            |err: &CommitError| {
2542                // Only retry on commit errors, not on reload_table errors
2543                match err {
2544                    CommitError::Commit(_) => {
2545                        tracing::warn!("Commit failed, will retry");
2546                        true
2547                    }
2548                    CommitError::ReloadTable(_) => {
2549                        tracing::error!(
2550                            "reload_table failed with non-retriable error, will not retry"
2551                        );
2552                        false
2553                    }
2554                }
2555            },
2556        )
2557        .await
2558        .map_err(|e| match e {
2559            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2560        })?;
2561        self.table = table;
2562
2563        let snapshot_num = self.table.metadata().snapshots().count();
2564        let catalog_name = self.config.common.catalog_name();
2565        let table_name = self.table.identifier().to_string();
2566        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2567        GLOBAL_SINK_METRICS
2568            .iceberg_snapshot_num
2569            .with_guarded_label_values(&metrics_labels)
2570            .set(snapshot_num as i64);
2571
2572        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2573
2574        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2575            && self.config.enable_compaction
2576            && iceberg_compact_stat_sender
2577                .send(IcebergSinkCompactionUpdate {
2578                    sink_id: self.sink_id,
2579                    compaction_interval: self.config.compaction_interval_sec(),
2580                    force_compaction: false,
2581                })
2582                .is_err()
2583        {
2584            warn!("failed to send iceberg compaction stats");
2585        }
2586
2587        Ok(())
2588    }
2589
2590    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2591    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2592    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2593    async fn is_snapshot_id_in_iceberg(
2594        &self,
2595        iceberg_config: &IcebergConfig,
2596        snapshot_id: i64,
2597    ) -> Result<bool> {
2598        let table = iceberg_config.load_table().await?;
2599        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2600            Ok(true)
2601        } else {
2602            Ok(false)
2603        }
2604    }
2605
2606    /// Check if the specified columns already exist in the iceberg table's current schema.
2607    /// This is used to determine if schema change has already been applied.
2608    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2609        let current_schema = self.table.metadata().current_schema();
2610        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2611            .context("Failed to convert schema")
2612            .map_err(SinkError::Iceberg)?;
2613
2614        let iceberg_arrow_convert = IcebergArrowConvert;
2615
2616        let schema_matches = |expected: &[ArrowField]| {
2617            if current_arrow_schema.fields().len() != expected.len() {
2618                return false;
2619            }
2620
2621            expected.iter().all(|expected_field| {
2622                current_arrow_schema.fields().iter().any(|current_field| {
2623                    current_field.name() == expected_field.name()
2624                        && current_field.data_type() == expected_field.data_type()
2625                })
2626            })
2627        };
2628
2629        let original_arrow_fields: Vec<ArrowField> = schema_change
2630            .original_schema
2631            .iter()
2632            .map(|pb_field| {
2633                let field = Field::from(pb_field);
2634                iceberg_arrow_convert
2635                    .to_arrow_field(&field.name, &field.data_type)
2636                    .context("Failed to convert field to arrow")
2637                    .map_err(SinkError::Iceberg)
2638            })
2639            .collect::<Result<_>>()?;
2640
2641        // If current schema equals original_schema, then schema change is NOT applied.
2642        if schema_matches(&original_arrow_fields) {
2643            tracing::debug!(
2644                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2645                original_arrow_fields.len()
2646            );
2647            return Ok(false);
2648        }
2649
2650        // We only support add_columns for now.
2651        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2652            schema_change.op.as_ref()
2653        else {
2654            return Err(SinkError::Iceberg(anyhow!(
2655                "Unsupported sink schema change op in iceberg sink: {:?}",
2656                schema_change.op
2657            )));
2658        };
2659
2660        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2661            .fields
2662            .iter()
2663            .map(|pb_field| {
2664                let field = Field::from(pb_field);
2665                iceberg_arrow_convert
2666                    .to_arrow_field(&field.name, &field.data_type)
2667                    .context("Failed to convert field to arrow")
2668                    .map_err(SinkError::Iceberg)
2669            })
2670            .collect::<Result<_>>()?;
2671
2672        let mut expected_after_change = original_arrow_fields;
2673        expected_after_change.extend(add_arrow_fields);
2674
2675        // If current schema equals original_schema + add_columns, then schema change is applied.
2676        if schema_matches(&expected_after_change) {
2677            tracing::debug!(
2678                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2679                expected_after_change.len()
2680            );
2681            return Ok(true);
2682        }
2683
2684        Err(SinkError::Iceberg(anyhow!(
2685            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2686            schema_change.original_schema.len()
2687        )))
2688    }
2689
2690    /// Commit schema changes (e.g., add columns) to the iceberg table.
2691    /// This function uses Transaction API to atomically update the table schema
2692    /// with optimistic locking to prevent concurrent conflicts.
2693    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2694        use iceberg::spec::NestedField;
2695
2696        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2697            schema_change.op.as_ref()
2698        else {
2699            return Err(SinkError::Iceberg(anyhow!(
2700                "Unsupported sink schema change op in iceberg sink: {:?}",
2701                schema_change.op
2702            )));
2703        };
2704
2705        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2706
2707        // Step 1: Get current table metadata
2708        let metadata = self.table.metadata();
2709        let mut next_field_id = metadata.last_column_id() + 1;
2710        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2711
2712        // Step 2: Build new fields to add
2713        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2714        let mut new_fields = Vec::new();
2715
2716        for field in &add_columns {
2717            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2718            let arrow_field = iceberg_create_table_arrow_convert
2719                .to_arrow_field(&field.name, &field.data_type)
2720                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2721                .map_err(SinkError::Iceberg)?;
2722
2723            // Convert Arrow DataType to Iceberg Type
2724            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2725                .map_err(|err| {
2726                    SinkError::Iceberg(
2727                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2728                    )
2729                })?;
2730
2731            // Create NestedField with the next available field ID
2732            let nested_field = Arc::new(NestedField::optional(
2733                next_field_id,
2734                &field.name,
2735                iceberg_type,
2736            ));
2737
2738            new_fields.push(nested_field);
2739            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2740            next_field_id += 1;
2741        }
2742
2743        // Step 3: Create Transaction with UpdateSchemaAction
2744        tracing::info!(
2745            "Committing schema change to catalog for table {}",
2746            self.table.identifier()
2747        );
2748
2749        let txn = Transaction::new(&self.table);
2750        let action = txn.update_schema().add_fields(new_fields);
2751
2752        let updated_table = action
2753            .apply(txn)
2754            .context("Failed to apply schema update action")
2755            .map_err(SinkError::Iceberg)?
2756            .commit(self.catalog.as_ref())
2757            .await
2758            .context("Failed to commit table schema change")
2759            .map_err(SinkError::Iceberg)?;
2760
2761        self.table = updated_table;
2762
2763        tracing::info!(
2764            "Successfully committed schema change, added {} columns to iceberg table",
2765            add_columns.len()
2766        );
2767
2768        Ok(())
2769    }
2770
2771    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2772    /// Returns the number of snapshots since the last rewrite/overwrite
2773    fn count_snapshots_since_rewrite(&self) -> usize {
2774        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2775        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2776
2777        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2778        let mut count = 0;
2779        for snapshot in snapshots {
2780            // Check if this snapshot represents a rewrite or overwrite operation
2781            let summary = snapshot.summary();
2782            match &summary.operation {
2783                Operation::Replace => {
2784                    // Found a rewrite/overwrite operation, stop counting
2785                    break;
2786                }
2787
2788                _ => {
2789                    // Increment count for each snapshot that is not a rewrite/overwrite
2790                    count += 1;
2791                }
2792            }
2793        }
2794
2795        count
2796    }
2797
2798    /// Wait until snapshot count since last rewrite is below the limit
2799    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2800        if !self.config.enable_compaction {
2801            return Ok(());
2802        }
2803
2804        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2805            loop {
2806                let current_count = self.count_snapshots_since_rewrite();
2807
2808                if current_count < max_snapshots {
2809                    tracing::info!(
2810                        "Snapshot count check passed: {} < {}",
2811                        current_count,
2812                        max_snapshots
2813                    );
2814                    break;
2815                }
2816
2817                tracing::info!(
2818                    "Snapshot count {} exceeds limit {}, waiting...",
2819                    current_count,
2820                    max_snapshots
2821                );
2822
2823                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2824                    && iceberg_compact_stat_sender
2825                        .send(IcebergSinkCompactionUpdate {
2826                            sink_id: self.sink_id,
2827                            compaction_interval: self.config.compaction_interval_sec(),
2828                            force_compaction: true,
2829                        })
2830                        .is_err()
2831                {
2832                    tracing::warn!("failed to send iceberg compaction stats");
2833                }
2834
2835                // Wait for 30 seconds before checking again
2836                tokio::time::sleep(Duration::from_secs(30)).await;
2837
2838                // Reload table to get latest snapshots
2839                self.table = self.config.load_table().await?;
2840            }
2841        }
2842        Ok(())
2843    }
2844}
2845
2846const MAP_KEY: &str = "key";
2847const MAP_VALUE: &str = "value";
2848
2849fn get_fields<'a>(
2850    our_field_type: &'a risingwave_common::types::DataType,
2851    data_type: &ArrowDataType,
2852    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2853) -> Option<ArrowFields> {
2854    match data_type {
2855        ArrowDataType::Struct(fields) => {
2856            match our_field_type {
2857                risingwave_common::types::DataType::Struct(struct_fields) => {
2858                    struct_fields.iter().for_each(|(name, data_type)| {
2859                        let res = schema_fields.insert(name, data_type);
2860                        // This assert is to make sure there is no duplicate field name in the schema.
2861                        assert!(res.is_none())
2862                    });
2863                }
2864                risingwave_common::types::DataType::Map(map_fields) => {
2865                    schema_fields.insert(MAP_KEY, map_fields.key());
2866                    schema_fields.insert(MAP_VALUE, map_fields.value());
2867                }
2868                risingwave_common::types::DataType::List(list) => {
2869                    list.elem()
2870                        .as_struct()
2871                        .iter()
2872                        .for_each(|(name, data_type)| {
2873                            let res = schema_fields.insert(name, data_type);
2874                            // This assert is to make sure there is no duplicate field name in the schema.
2875                            assert!(res.is_none())
2876                        });
2877                }
2878                _ => {}
2879            };
2880            Some(fields.clone())
2881        }
2882        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2883            get_fields(our_field_type, field.data_type(), schema_fields)
2884        }
2885        _ => None, // not a supported complex type and unlikely to show up
2886    }
2887}
2888
2889fn check_compatibility(
2890    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2891    fields: &ArrowFields,
2892) -> anyhow::Result<bool> {
2893    for arrow_field in fields {
2894        let our_field_type = schema_fields
2895            .get(arrow_field.name().as_str())
2896            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2897
2898        // Iceberg source should be able to read iceberg decimal type.
2899        let converted_arrow_data_type = IcebergArrowConvert
2900            .to_arrow_field("", our_field_type)
2901            .map_err(|e| anyhow!(e))?
2902            .data_type()
2903            .clone();
2904
2905        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2906            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2907            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2908            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2909            (ArrowDataType::List(_), ArrowDataType::List(field))
2910            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2911                let mut schema_fields = HashMap::new();
2912                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2913                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2914            }
2915            // validate nested structs
2916            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2917                let mut schema_fields = HashMap::new();
2918                our_field_type
2919                    .as_struct()
2920                    .iter()
2921                    .for_each(|(name, data_type)| {
2922                        let res = schema_fields.insert(name, data_type);
2923                        // This assert is to make sure there is no duplicate field name in the schema.
2924                        assert!(res.is_none())
2925                    });
2926                check_compatibility(schema_fields, fields)?
2927            }
2928            // cases where left != right (metadata, field name mismatch)
2929            //
2930            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2931            // {"PARQUET:field_id": ".."}
2932            //
2933            // map: The standard name in arrow is "entries", "key", "value".
2934            // in iceberg-rs, it's called "key_value"
2935            (left, right) => left.equals_datatype(right),
2936        };
2937        if !compatible {
2938            bail!(
2939                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2940                arrow_field.name(),
2941                converted_arrow_data_type,
2942                arrow_field.data_type()
2943            );
2944        }
2945    }
2946    Ok(true)
2947}
2948
2949/// Try to match our schema with iceberg schema.
2950pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2951    if rw_schema.fields.len() != arrow_schema.fields().len() {
2952        bail!(
2953            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2954            rw_schema.fields.len(),
2955            arrow_schema.fields.len()
2956        );
2957    }
2958
2959    let mut schema_fields = HashMap::new();
2960    rw_schema.fields.iter().for_each(|field| {
2961        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2962        // This assert is to make sure there is no duplicate field name in the schema.
2963        assert!(res.is_none())
2964    });
2965
2966    check_compatibility(schema_fields, &arrow_schema.fields)?;
2967    Ok(())
2968}
2969
2970fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2971    serde_json::to_vec(&metadata).unwrap()
2972}
2973
2974fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2975    serde_json::from_slice(&bytes).unwrap()
2976}
2977
2978pub fn parse_partition_by_exprs(
2979    expr: String,
2980) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2981    // captures column, transform(column), transform(n,column), transform(n, column)
2982    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2983    if !re.is_match(&expr) {
2984        bail!(format!(
2985            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2986            expr
2987        ))
2988    }
2989    let caps = re.captures_iter(&expr);
2990
2991    let mut partition_columns = vec![];
2992
2993    for mat in caps {
2994        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2995            (&mat["transform"], Transform::Identity)
2996        } else {
2997            let mut func = mat["transform"].to_owned();
2998            if func == "bucket" || func == "truncate" {
2999                let n = &mat
3000                    .name("n")
3001                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
3002                    .as_str();
3003                func = format!("{func}[{n}]");
3004            }
3005            (
3006                &mat["field"],
3007                Transform::from_str(&func)
3008                    .with_context(|| format!("invalid transform function {}", func))?,
3009            )
3010        };
3011        partition_columns.push((column.to_owned(), transform));
3012    }
3013    Ok(partition_columns)
3014}
3015
3016pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
3017    if should_enable_iceberg_cow(sink_type, write_mode) {
3018        ICEBERG_COW_BRANCH.to_owned()
3019    } else {
3020        MAIN_BRANCH.to_owned()
3021    }
3022}
3023
3024pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
3025    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
3026}
3027
3028impl crate::with_options::WithOptions for IcebergWriteMode {}
3029
3030impl crate::with_options::WithOptions for FormatVersion {}
3031
3032impl crate::with_options::WithOptions for CompactionType {}
3033
3034#[cfg(test)]
3035mod test {
3036    use std::collections::BTreeMap;
3037
3038    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
3039    use risingwave_common::types::{DataType, MapType, StructType};
3040
3041    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
3042    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
3043    use crate::sink::iceberg::{
3044        COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
3045        CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FormatVersion,
3046        ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, IcebergConfig, IcebergWriteMode,
3047        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3048        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3049    };
3050
3051    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
3052
3053    #[test]
3054    fn test_compatible_arrow_schema() {
3055        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3056
3057        use super::*;
3058        let risingwave_schema = Schema::new(vec![
3059            Field::with_name(DataType::Int32, "a"),
3060            Field::with_name(DataType::Int32, "b"),
3061            Field::with_name(DataType::Int32, "c"),
3062        ]);
3063        let arrow_schema = ArrowSchema::new(vec![
3064            ArrowField::new("a", ArrowDataType::Int32, false),
3065            ArrowField::new("b", ArrowDataType::Int32, false),
3066            ArrowField::new("c", ArrowDataType::Int32, false),
3067        ]);
3068
3069        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3070
3071        let risingwave_schema = Schema::new(vec![
3072            Field::with_name(DataType::Int32, "d"),
3073            Field::with_name(DataType::Int32, "c"),
3074            Field::with_name(DataType::Int32, "a"),
3075            Field::with_name(DataType::Int32, "b"),
3076        ]);
3077        let arrow_schema = ArrowSchema::new(vec![
3078            ArrowField::new("a", ArrowDataType::Int32, false),
3079            ArrowField::new("b", ArrowDataType::Int32, false),
3080            ArrowField::new("d", ArrowDataType::Int32, false),
3081            ArrowField::new("c", ArrowDataType::Int32, false),
3082        ]);
3083        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3084
3085        let risingwave_schema = Schema::new(vec![
3086            Field::with_name(
3087                DataType::Struct(StructType::new(vec![
3088                    ("a1", DataType::Int32),
3089                    (
3090                        "a2",
3091                        DataType::Struct(StructType::new(vec![
3092                            ("a21", DataType::Bytea),
3093                            (
3094                                "a22",
3095                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3096                            ),
3097                        ])),
3098                    ),
3099                ])),
3100                "a",
3101            ),
3102            Field::with_name(
3103                DataType::list(DataType::Struct(StructType::new(vec![
3104                    ("b1", DataType::Int32),
3105                    ("b2", DataType::Bytea),
3106                    (
3107                        "b3",
3108                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3109                    ),
3110                ]))),
3111                "b",
3112            ),
3113            Field::with_name(
3114                DataType::Map(MapType::from_kv(
3115                    DataType::Varchar,
3116                    DataType::list(DataType::Struct(StructType::new([
3117                        ("c1", DataType::Int32),
3118                        ("c2", DataType::Bytea),
3119                        (
3120                            "c3",
3121                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3122                        ),
3123                    ]))),
3124                )),
3125                "c",
3126            ),
3127        ]);
3128        let arrow_schema = ArrowSchema::new(vec![
3129            ArrowField::new(
3130                "a",
3131                ArrowDataType::Struct(ArrowFields::from(vec![
3132                    ArrowField::new("a1", ArrowDataType::Int32, false),
3133                    ArrowField::new(
3134                        "a2",
3135                        ArrowDataType::Struct(ArrowFields::from(vec![
3136                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3137                            ArrowField::new_map(
3138                                "a22",
3139                                "entries",
3140                                ArrowFieldRef::new(ArrowField::new(
3141                                    "key",
3142                                    ArrowDataType::Utf8,
3143                                    false,
3144                                )),
3145                                ArrowFieldRef::new(ArrowField::new(
3146                                    "value",
3147                                    ArrowDataType::Utf8,
3148                                    false,
3149                                )),
3150                                false,
3151                                false,
3152                            ),
3153                        ])),
3154                        false,
3155                    ),
3156                ])),
3157                false,
3158            ),
3159            ArrowField::new(
3160                "b",
3161                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3162                    ArrowDataType::Struct(ArrowFields::from(vec![
3163                        ArrowField::new("b1", ArrowDataType::Int32, false),
3164                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3165                        ArrowField::new_map(
3166                            "b3",
3167                            "entries",
3168                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3169                            ArrowFieldRef::new(ArrowField::new(
3170                                "value",
3171                                ArrowDataType::Utf8,
3172                                false,
3173                            )),
3174                            false,
3175                            false,
3176                        ),
3177                    ])),
3178                    false,
3179                ))),
3180                false,
3181            ),
3182            ArrowField::new_map(
3183                "c",
3184                "entries",
3185                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3186                ArrowFieldRef::new(ArrowField::new(
3187                    "value",
3188                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3189                        ArrowDataType::Struct(ArrowFields::from(vec![
3190                            ArrowField::new("c1", ArrowDataType::Int32, false),
3191                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3192                            ArrowField::new_map(
3193                                "c3",
3194                                "entries",
3195                                ArrowFieldRef::new(ArrowField::new(
3196                                    "key",
3197                                    ArrowDataType::Utf8,
3198                                    false,
3199                                )),
3200                                ArrowFieldRef::new(ArrowField::new(
3201                                    "value",
3202                                    ArrowDataType::Utf8,
3203                                    false,
3204                                )),
3205                                false,
3206                                false,
3207                            ),
3208                        ])),
3209                        false,
3210                    ))),
3211                    false,
3212                )),
3213                false,
3214                false,
3215            ),
3216        ]);
3217        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3218    }
3219
3220    #[test]
3221    fn test_parse_iceberg_config() {
3222        let values = [
3223            ("connector", "iceberg"),
3224            ("type", "upsert"),
3225            ("primary_key", "v1"),
3226            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3227            ("warehouse.path", "s3://iceberg"),
3228            ("s3.endpoint", "http://127.0.0.1:9301"),
3229            ("s3.access.key", "hummockadmin"),
3230            ("s3.secret.key", "hummockadmin"),
3231            ("s3.path.style.access", "true"),
3232            ("s3.region", "us-east-1"),
3233            ("catalog.type", "jdbc"),
3234            ("catalog.name", "demo"),
3235            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3236            ("catalog.jdbc.user", "admin"),
3237            ("catalog.jdbc.password", "123456"),
3238            ("database.name", "demo_db"),
3239            ("table.name", "demo_table"),
3240            ("enable_compaction", "true"),
3241            ("compaction_interval_sec", "1800"),
3242            ("enable_snapshot_expiration", "true"),
3243        ]
3244        .into_iter()
3245        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3246        .collect();
3247
3248        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3249
3250        let expected_iceberg_config = IcebergConfig {
3251            common: IcebergCommon {
3252                warehouse_path: Some("s3://iceberg".to_owned()),
3253                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3254                s3_region: Some("us-east-1".to_owned()),
3255                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3256                s3_access_key: Some("hummockadmin".to_owned()),
3257                s3_secret_key: Some("hummockadmin".to_owned()),
3258                s3_iam_role_arn: None,
3259                gcs_credential: None,
3260                catalog_type: Some("jdbc".to_owned()),
3261                glue_id: None,
3262                glue_region: None,
3263                glue_access_key: None,
3264                glue_secret_key: None,
3265                glue_iam_role_arn: None,
3266                catalog_name: Some("demo".to_owned()),
3267                s3_path_style_access: Some(true),
3268                catalog_credential: None,
3269                catalog_oauth2_server_uri: None,
3270                catalog_scope: None,
3271                catalog_token: None,
3272                enable_config_load: None,
3273                rest_signing_name: None,
3274                rest_signing_region: None,
3275                rest_sigv4_enabled: None,
3276                hosted_catalog: None,
3277                azblob_account_name: None,
3278                azblob_account_key: None,
3279                azblob_endpoint_url: None,
3280                catalog_header: None,
3281                adlsgen2_account_name: None,
3282                adlsgen2_account_key: None,
3283                adlsgen2_endpoint: None,
3284                vended_credentials: None,
3285                catalog_security: None,
3286                gcp_auth_scopes: None,
3287                catalog_io_impl: None,
3288            },
3289            table: IcebergTableIdentifier {
3290                database_name: Some("demo_db".to_owned()),
3291                table_name: "demo_table".to_owned(),
3292            },
3293            r#type: "upsert".to_owned(),
3294            force_append_only: false,
3295            primary_key: Some(vec!["v1".to_owned()]),
3296            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3297            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3298                .into_iter()
3299                .map(|(k, v)| (k.to_owned(), v.to_owned()))
3300                .collect(),
3301            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3302            commit_checkpoint_size_threshold_mb: Some(
3303                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3304            ),
3305            create_table_if_not_exists: false,
3306            is_exactly_once: Some(true),
3307            commit_retry_num: 8,
3308            enable_compaction: true,
3309            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3310            enable_snapshot_expiration: true,
3311            write_mode: IcebergWriteMode::MergeOnRead,
3312            format_version: FormatVersion::V2,
3313            snapshot_expiration_max_age_millis: None,
3314            snapshot_expiration_retain_last: None,
3315            snapshot_expiration_clear_expired_files: true,
3316            snapshot_expiration_clear_expired_meta_data: true,
3317            max_snapshots_num_before_compaction: None,
3318            small_files_threshold_mb: None,
3319            delete_files_count_threshold: None,
3320            trigger_snapshot_count: None,
3321            target_file_size_mb: None,
3322            compaction_type: None,
3323            write_parquet_compression: None,
3324            write_parquet_max_row_group_rows: None,
3325        };
3326
3327        assert_eq!(iceberg_config, expected_iceberg_config);
3328
3329        assert_eq!(
3330            &iceberg_config.full_table_name().unwrap().to_string(),
3331            "demo_db.demo_table"
3332        );
3333    }
3334
3335    #[test]
3336    fn test_parse_commit_checkpoint_size_threshold() {
3337        let values: BTreeMap<String, String> = [
3338            ("connector", "iceberg"),
3339            ("type", "append-only"),
3340            ("force_append_only", "true"),
3341            ("catalog.name", "test-catalog"),
3342            ("catalog.type", "storage"),
3343            ("warehouse.path", "s3://my-bucket/warehouse"),
3344            ("database.name", "test_db"),
3345            ("table.name", "test_table"),
3346            ("commit_checkpoint_size_threshold_mb", "128"),
3347        ]
3348        .into_iter()
3349        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3350        .collect();
3351
3352        let config = IcebergConfig::from_btreemap(values).unwrap();
3353        assert_eq!(config.commit_checkpoint_size_threshold_mb, Some(128));
3354        assert_eq!(
3355            config.commit_checkpoint_size_threshold_bytes(),
3356            Some(128 * 1024 * 1024)
3357        );
3358    }
3359
3360    #[test]
3361    fn test_default_commit_checkpoint_size_threshold() {
3362        let values: BTreeMap<String, String> = [
3363            ("connector", "iceberg"),
3364            ("type", "append-only"),
3365            ("force_append_only", "true"),
3366            ("catalog.name", "test-catalog"),
3367            ("catalog.type", "storage"),
3368            ("warehouse.path", "s3://my-bucket/warehouse"),
3369            ("database.name", "test_db"),
3370            ("table.name", "test_table"),
3371        ]
3372        .into_iter()
3373        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3374        .collect();
3375
3376        let config = IcebergConfig::from_btreemap(values).unwrap();
3377        assert_eq!(
3378            config.commit_checkpoint_size_threshold_mb,
3379            Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
3380        );
3381        assert_eq!(
3382            config.commit_checkpoint_size_threshold_bytes(),
3383            Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB * 1024 * 1024)
3384        );
3385    }
3386
3387    #[test]
3388    fn test_reject_zero_commit_checkpoint_size_threshold() {
3389        let values: BTreeMap<String, String> = [
3390            ("connector", "iceberg"),
3391            ("type", "append-only"),
3392            ("force_append_only", "true"),
3393            ("catalog.name", "test-catalog"),
3394            ("catalog.type", "storage"),
3395            ("warehouse.path", "s3://my-bucket/warehouse"),
3396            ("database.name", "test_db"),
3397            ("table.name", "test_table"),
3398            ("commit_checkpoint_size_threshold_mb", "0"),
3399        ]
3400        .into_iter()
3401        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3402        .collect();
3403
3404        let err = IcebergConfig::from_btreemap(values).unwrap_err();
3405        assert!(
3406            err.to_string()
3407                .contains("`commit_checkpoint_size_threshold_mb` must be greater than 0")
3408        );
3409    }
3410
3411    async fn test_create_catalog(configs: BTreeMap<String, String>) {
3412        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3413
3414        let _table = iceberg_config.load_table().await.unwrap();
3415    }
3416
3417    #[tokio::test]
3418    #[ignore]
3419    async fn test_storage_catalog() {
3420        let values = [
3421            ("connector", "iceberg"),
3422            ("type", "append-only"),
3423            ("force_append_only", "true"),
3424            ("s3.endpoint", "http://127.0.0.1:9301"),
3425            ("s3.access.key", "hummockadmin"),
3426            ("s3.secret.key", "hummockadmin"),
3427            ("s3.region", "us-east-1"),
3428            ("s3.path.style.access", "true"),
3429            ("catalog.name", "demo"),
3430            ("catalog.type", "storage"),
3431            ("warehouse.path", "s3://icebergdata/demo"),
3432            ("database.name", "s1"),
3433            ("table.name", "t1"),
3434        ]
3435        .into_iter()
3436        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3437        .collect();
3438
3439        test_create_catalog(values).await;
3440    }
3441
3442    #[tokio::test]
3443    #[ignore]
3444    async fn test_rest_catalog() {
3445        let values = [
3446            ("connector", "iceberg"),
3447            ("type", "append-only"),
3448            ("force_append_only", "true"),
3449            ("s3.endpoint", "http://127.0.0.1:9301"),
3450            ("s3.access.key", "hummockadmin"),
3451            ("s3.secret.key", "hummockadmin"),
3452            ("s3.region", "us-east-1"),
3453            ("s3.path.style.access", "true"),
3454            ("catalog.name", "demo"),
3455            ("catalog.type", "rest"),
3456            ("catalog.uri", "http://192.168.167.4:8181"),
3457            ("warehouse.path", "s3://icebergdata/demo"),
3458            ("database.name", "s1"),
3459            ("table.name", "t1"),
3460        ]
3461        .into_iter()
3462        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3463        .collect();
3464
3465        test_create_catalog(values).await;
3466    }
3467
3468    #[tokio::test]
3469    #[ignore]
3470    async fn test_jdbc_catalog() {
3471        let values = [
3472            ("connector", "iceberg"),
3473            ("type", "append-only"),
3474            ("force_append_only", "true"),
3475            ("s3.endpoint", "http://127.0.0.1:9301"),
3476            ("s3.access.key", "hummockadmin"),
3477            ("s3.secret.key", "hummockadmin"),
3478            ("s3.region", "us-east-1"),
3479            ("s3.path.style.access", "true"),
3480            ("catalog.name", "demo"),
3481            ("catalog.type", "jdbc"),
3482            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3483            ("catalog.jdbc.user", "admin"),
3484            ("catalog.jdbc.password", "123456"),
3485            ("warehouse.path", "s3://icebergdata/demo"),
3486            ("database.name", "s1"),
3487            ("table.name", "t1"),
3488        ]
3489        .into_iter()
3490        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3491        .collect();
3492
3493        test_create_catalog(values).await;
3494    }
3495
3496    #[tokio::test]
3497    #[ignore]
3498    async fn test_hive_catalog() {
3499        let values = [
3500            ("connector", "iceberg"),
3501            ("type", "append-only"),
3502            ("force_append_only", "true"),
3503            ("s3.endpoint", "http://127.0.0.1:9301"),
3504            ("s3.access.key", "hummockadmin"),
3505            ("s3.secret.key", "hummockadmin"),
3506            ("s3.region", "us-east-1"),
3507            ("s3.path.style.access", "true"),
3508            ("catalog.name", "demo"),
3509            ("catalog.type", "hive"),
3510            ("catalog.uri", "thrift://localhost:9083"),
3511            ("warehouse.path", "s3://icebergdata/demo"),
3512            ("database.name", "s1"),
3513            ("table.name", "t1"),
3514        ]
3515        .into_iter()
3516        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3517        .collect();
3518
3519        test_create_catalog(values).await;
3520    }
3521
3522    /// Test parsing Google/BigLake authentication configuration.
3523    #[test]
3524    fn test_parse_google_auth_config() {
3525        let values: BTreeMap<String, String> = [
3526            ("connector", "iceberg"),
3527            ("type", "append-only"),
3528            ("force_append_only", "true"),
3529            ("catalog.name", "biglake-catalog"),
3530            ("catalog.type", "rest"),
3531            ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3532            ("warehouse.path", "bq://projects/my-gcp-project"),
3533            ("catalog.header", "x-goog-user-project=my-gcp-project"),
3534            ("catalog.security", "google"),
3535            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3536            ("database.name", "my_dataset"),
3537            ("table.name", "my_table"),
3538        ]
3539        .into_iter()
3540        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3541        .collect();
3542
3543        let config = IcebergConfig::from_btreemap(values).unwrap();
3544        assert_eq!(config.catalog_type(), "rest");
3545        assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3546        assert_eq!(
3547            config.common.gcp_auth_scopes.as_deref(),
3548            Some(
3549                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3550            )
3551        );
3552        assert_eq!(
3553            config.common.warehouse_path.as_deref(),
3554            Some("bq://projects/my-gcp-project")
3555        );
3556        assert_eq!(
3557            config.common.catalog_header.as_deref(),
3558            Some("x-goog-user-project=my-gcp-project")
3559        );
3560    }
3561
3562    /// Test parsing `oauth2` security configuration.
3563    #[test]
3564    fn test_parse_oauth2_security_config() {
3565        let values: BTreeMap<String, String> = [
3566            ("connector", "iceberg"),
3567            ("type", "append-only"),
3568            ("force_append_only", "true"),
3569            ("catalog.name", "oauth2-catalog"),
3570            ("catalog.type", "rest"),
3571            ("catalog.uri", "https://example.com/iceberg/rest"),
3572            ("warehouse.path", "s3://my-bucket/warehouse"),
3573            ("catalog.security", "oauth2"),
3574            ("catalog.credential", "client_id:client_secret"),
3575            ("catalog.token", "bearer-token"),
3576            (
3577                "catalog.oauth2_server_uri",
3578                "https://oauth.example.com/token",
3579            ),
3580            ("catalog.scope", "read write"),
3581            ("database.name", "test_db"),
3582            ("table.name", "test_table"),
3583        ]
3584        .into_iter()
3585        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3586        .collect();
3587
3588        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3589
3590        // Verify catalog type
3591        assert_eq!(iceberg_config.catalog_type(), "rest");
3592
3593        // Verify OAuth2-specific options
3594        assert_eq!(
3595            iceberg_config.common.catalog_security.as_deref(),
3596            Some("oauth2")
3597        );
3598        assert_eq!(
3599            iceberg_config.common.catalog_credential.as_deref(),
3600            Some("client_id:client_secret")
3601        );
3602        assert_eq!(
3603            iceberg_config.common.catalog_token.as_deref(),
3604            Some("bearer-token")
3605        );
3606        assert_eq!(
3607            iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3608            Some("https://oauth.example.com/token")
3609        );
3610        assert_eq!(
3611            iceberg_config.common.catalog_scope.as_deref(),
3612            Some("read write")
3613        );
3614    }
3615
3616    /// Test parsing invalid security configuration.
3617    #[test]
3618    fn test_parse_invalid_security_config() {
3619        let values: BTreeMap<String, String> = [
3620            ("connector", "iceberg"),
3621            ("type", "append-only"),
3622            ("force_append_only", "true"),
3623            ("catalog.name", "invalid-catalog"),
3624            ("catalog.type", "rest"),
3625            ("catalog.uri", "https://example.com/iceberg/rest"),
3626            ("warehouse.path", "s3://my-bucket/warehouse"),
3627            ("catalog.security", "invalid_security_type"),
3628            ("database.name", "test_db"),
3629            ("table.name", "test_table"),
3630        ]
3631        .into_iter()
3632        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3633        .collect();
3634
3635        // This should still parse successfully, but with a warning for unknown security type
3636        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3637
3638        // Verify that the invalid security type is still stored
3639        assert_eq!(
3640            iceberg_config.common.catalog_security.as_deref(),
3641            Some("invalid_security_type")
3642        );
3643
3644        // Verify catalog type
3645        assert_eq!(iceberg_config.catalog_type(), "rest");
3646    }
3647
3648    /// Test parsing custom `FileIO` implementation configuration.
3649    #[test]
3650    fn test_parse_custom_io_impl_config() {
3651        let values: BTreeMap<String, String> = [
3652            ("connector", "iceberg"),
3653            ("type", "append-only"),
3654            ("force_append_only", "true"),
3655            ("catalog.name", "gcs-catalog"),
3656            ("catalog.type", "rest"),
3657            ("catalog.uri", "https://example.com/iceberg/rest"),
3658            ("warehouse.path", "gs://my-bucket/warehouse"),
3659            ("catalog.security", "google"),
3660            ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3661            ("database.name", "test_db"),
3662            ("table.name", "test_table"),
3663        ]
3664        .into_iter()
3665        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3666        .collect();
3667
3668        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3669
3670        // Verify catalog type
3671        assert_eq!(iceberg_config.catalog_type(), "rest");
3672
3673        // Verify custom `FileIO` implementation
3674        assert_eq!(
3675            iceberg_config.common.catalog_io_impl.as_deref(),
3676            Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3677        );
3678
3679        // Verify Google security is set
3680        assert_eq!(
3681            iceberg_config.common.catalog_security.as_deref(),
3682            Some("google")
3683        );
3684    }
3685
3686    #[test]
3687    fn test_config_constants_consistency() {
3688        // This test ensures our constants match the expected configuration names
3689        // If you change a constant, this test will remind you to update both places
3690        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3691        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3692        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3693        assert_eq!(WRITE_MODE, "write_mode");
3694        assert_eq!(
3695            COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3696            "commit_checkpoint_size_threshold_mb"
3697        );
3698        assert_eq!(
3699            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3700            "snapshot_expiration_retain_last"
3701        );
3702        assert_eq!(
3703            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3704            "snapshot_expiration_max_age_millis"
3705        );
3706        assert_eq!(
3707            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3708            "snapshot_expiration_clear_expired_files"
3709        );
3710        assert_eq!(
3711            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3712            "snapshot_expiration_clear_expired_meta_data"
3713        );
3714        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3715    }
3716
3717    /// Test parsing all compaction.* prefix configs and their default values.
3718    #[test]
3719    fn test_parse_compaction_config() {
3720        // Test with all compaction configs specified
3721        let values: BTreeMap<String, String> = [
3722            ("connector", "iceberg"),
3723            ("type", "upsert"),
3724            ("primary_key", "id"),
3725            ("warehouse.path", "s3://iceberg"),
3726            ("s3.endpoint", "http://127.0.0.1:9301"),
3727            ("s3.access.key", "test"),
3728            ("s3.secret.key", "test"),
3729            ("s3.region", "us-east-1"),
3730            ("catalog.type", "storage"),
3731            ("catalog.name", "demo"),
3732            ("database.name", "test_db"),
3733            ("table.name", "test_table"),
3734            ("enable_compaction", "true"),
3735            ("compaction.max_snapshots_num", "100"),
3736            ("compaction.small_files_threshold_mb", "512"),
3737            ("compaction.delete_files_count_threshold", "50"),
3738            ("compaction.trigger_snapshot_count", "10"),
3739            ("compaction.target_file_size_mb", "256"),
3740            ("compaction.type", "full"),
3741            ("compaction.write_parquet_compression", "zstd"),
3742            ("compaction.write_parquet_max_row_group_rows", "50000"),
3743        ]
3744        .into_iter()
3745        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3746        .collect();
3747
3748        let config = IcebergConfig::from_btreemap(values).unwrap();
3749        assert!(config.enable_compaction);
3750        assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3751        assert_eq!(config.small_files_threshold_mb, Some(512));
3752        assert_eq!(config.delete_files_count_threshold, Some(50));
3753        assert_eq!(config.trigger_snapshot_count, Some(10));
3754        assert_eq!(config.target_file_size_mb, Some(256));
3755        assert_eq!(config.compaction_type, Some(CompactionType::Full));
3756        assert_eq!(config.target_file_size_mb(), 256);
3757        assert_eq!(config.write_parquet_compression(), "zstd");
3758        assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3759
3760        // Test default values (no compaction configs specified)
3761        let values: BTreeMap<String, String> = [
3762            ("connector", "iceberg"),
3763            ("type", "append-only"),
3764            ("force_append_only", "true"),
3765            ("catalog.name", "test-catalog"),
3766            ("catalog.type", "storage"),
3767            ("warehouse.path", "s3://my-bucket/warehouse"),
3768            ("database.name", "test_db"),
3769            ("table.name", "test_table"),
3770        ]
3771        .into_iter()
3772        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3773        .collect();
3774
3775        let config = IcebergConfig::from_btreemap(values).unwrap();
3776        assert_eq!(config.target_file_size_mb(), 1024); // Default
3777        assert_eq!(config.write_parquet_compression(), "zstd"); // Default
3778        assert_eq!(config.write_parquet_max_row_group_rows(), 122880); // Default
3779    }
3780
3781    /// Test parquet compression parsing.
3782    #[test]
3783    fn test_parse_parquet_compression() {
3784        use parquet::basic::Compression;
3785
3786        use super::parse_parquet_compression;
3787
3788        // Test valid compression types
3789        assert!(matches!(
3790            parse_parquet_compression("snappy"),
3791            Compression::SNAPPY
3792        ));
3793        assert!(matches!(
3794            parse_parquet_compression("gzip"),
3795            Compression::GZIP(_)
3796        ));
3797        assert!(matches!(
3798            parse_parquet_compression("zstd"),
3799            Compression::ZSTD(_)
3800        ));
3801        assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3802        assert!(matches!(
3803            parse_parquet_compression("brotli"),
3804            Compression::BROTLI(_)
3805        ));
3806        assert!(matches!(
3807            parse_parquet_compression("uncompressed"),
3808            Compression::UNCOMPRESSED
3809        ));
3810
3811        // Test case insensitivity
3812        assert!(matches!(
3813            parse_parquet_compression("SNAPPY"),
3814            Compression::SNAPPY
3815        ));
3816        assert!(matches!(
3817            parse_parquet_compression("Zstd"),
3818            Compression::ZSTD(_)
3819        ));
3820
3821        // Test invalid compression (should fall back to SNAPPY)
3822        assert!(matches!(
3823            parse_parquet_compression("invalid"),
3824            Compression::SNAPPY
3825        ));
3826    }
3827
3828    #[test]
3829    fn test_append_only_rejects_copy_on_write() {
3830        // Test that append-only sinks reject copy-on-write mode
3831        let values = [
3832            ("connector", "iceberg"),
3833            ("type", "append-only"),
3834            ("warehouse.path", "s3://iceberg"),
3835            ("s3.endpoint", "http://127.0.0.1:9301"),
3836            ("s3.access.key", "test"),
3837            ("s3.secret.key", "test"),
3838            ("s3.region", "us-east-1"),
3839            ("catalog.type", "storage"),
3840            ("catalog.name", "demo"),
3841            ("database.name", "test_db"),
3842            ("table.name", "test_table"),
3843            ("write_mode", "copy-on-write"),
3844        ]
3845        .into_iter()
3846        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3847        .collect();
3848
3849        let result = IcebergConfig::from_btreemap(values);
3850        assert!(result.is_err());
3851        assert!(
3852            result
3853                .unwrap_err()
3854                .to_string()
3855                .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3856        );
3857    }
3858
3859    #[test]
3860    fn test_append_only_accepts_merge_on_read() {
3861        // Test that append-only sinks accept merge-on-read mode (explicit)
3862        let values = [
3863            ("connector", "iceberg"),
3864            ("type", "append-only"),
3865            ("warehouse.path", "s3://iceberg"),
3866            ("s3.endpoint", "http://127.0.0.1:9301"),
3867            ("s3.access.key", "test"),
3868            ("s3.secret.key", "test"),
3869            ("s3.region", "us-east-1"),
3870            ("catalog.type", "storage"),
3871            ("catalog.name", "demo"),
3872            ("database.name", "test_db"),
3873            ("table.name", "test_table"),
3874            ("write_mode", "merge-on-read"),
3875        ]
3876        .into_iter()
3877        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3878        .collect();
3879
3880        let result = IcebergConfig::from_btreemap(values);
3881        assert!(result.is_ok());
3882        let config = result.unwrap();
3883        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3884    }
3885
3886    #[test]
3887    fn test_append_only_defaults_to_merge_on_read() {
3888        // Test that append-only sinks default to merge-on-read when write_mode is not specified
3889        let values = [
3890            ("connector", "iceberg"),
3891            ("type", "append-only"),
3892            ("warehouse.path", "s3://iceberg"),
3893            ("s3.endpoint", "http://127.0.0.1:9301"),
3894            ("s3.access.key", "test"),
3895            ("s3.secret.key", "test"),
3896            ("s3.region", "us-east-1"),
3897            ("catalog.type", "storage"),
3898            ("catalog.name", "demo"),
3899            ("database.name", "test_db"),
3900            ("table.name", "test_table"),
3901        ]
3902        .into_iter()
3903        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3904        .collect();
3905
3906        let result = IcebergConfig::from_btreemap(values);
3907        assert!(result.is_ok());
3908        let config = result.unwrap();
3909        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3910    }
3911
3912    #[test]
3913    fn test_upsert_accepts_copy_on_write() {
3914        // Test that upsert sinks accept copy-on-write mode
3915        let values = [
3916            ("connector", "iceberg"),
3917            ("type", "upsert"),
3918            ("primary_key", "id"),
3919            ("warehouse.path", "s3://iceberg"),
3920            ("s3.endpoint", "http://127.0.0.1:9301"),
3921            ("s3.access.key", "test"),
3922            ("s3.secret.key", "test"),
3923            ("s3.region", "us-east-1"),
3924            ("catalog.type", "storage"),
3925            ("catalog.name", "demo"),
3926            ("database.name", "test_db"),
3927            ("table.name", "test_table"),
3928            ("write_mode", "copy-on-write"),
3929        ]
3930        .into_iter()
3931        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3932        .collect();
3933
3934        let result = IcebergConfig::from_btreemap(values);
3935        assert!(result.is_ok());
3936        let config = result.unwrap();
3937        assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3938    }
3939}