risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31    SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32    UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38    DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45    PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50    DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64    Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94    SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103    TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121    #[default]
122    MergeOnRead,
123    CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127    pub fn as_str(self) -> &'static str {
128        match self {
129            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131        }
132    }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136    type Err = SinkError;
137
138    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139        match s {
140            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142            _ => Err(SinkError::Config(anyhow!(format!(
143                "invalid write_mode: {}, must be one of: {}, {}",
144                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145            )))),
146        }
147    }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151    type Error = <Self as std::str::FromStr>::Err;
152
153    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154        value.parse()
155    }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159    type Error = <Self as std::str::FromStr>::Err;
160
161    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162        value.as_str().parse()
163    }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.write_str(self.as_str())
169    }
170}
171
172// Configuration constants
173pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182    "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197    "compaction.write_parquet_max_row_group_rows";
198
199const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
200
201fn default_commit_retry_num() -> u32 {
202    8
203}
204
205fn default_iceberg_write_mode() -> IcebergWriteMode {
206    IcebergWriteMode::MergeOnRead
207}
208
209fn default_iceberg_format_version() -> FormatVersion {
210    FormatVersion::V2
211}
212
213fn default_true() -> bool {
214    true
215}
216
217fn default_some_true() -> Option<bool> {
218    Some(true)
219}
220
221fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
222    let parsed = value
223        .trim()
224        .parse::<u8>()
225        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
226    match parsed {
227        1 => Ok(FormatVersion::V1),
228        2 => Ok(FormatVersion::V2),
229        3 => Ok(FormatVersion::V3),
230        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
231    }
232}
233
234fn deserialize_format_version<'de, D>(
235    deserializer: D,
236) -> std::result::Result<FormatVersion, D::Error>
237where
238    D: Deserializer<'de>,
239{
240    struct FormatVersionVisitor;
241
242    impl<'de> Visitor<'de> for FormatVersionVisitor {
243        type Value = FormatVersion;
244
245        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246            formatter.write_str("format-version as 1, 2, or 3")
247        }
248
249        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
250        where
251            E: de::Error,
252        {
253            let value = u8::try_from(value)
254                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
255            parse_format_version_str(&value.to_string()).map_err(E::custom)
256        }
257
258        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
259        where
260            E: de::Error,
261        {
262            let value = u8::try_from(value)
263                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
264            parse_format_version_str(&value.to_string()).map_err(E::custom)
265        }
266
267        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
268        where
269            E: de::Error,
270        {
271            parse_format_version_str(value).map_err(E::custom)
272        }
273
274        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
275        where
276            E: de::Error,
277        {
278            self.visit_str(&value)
279        }
280    }
281
282    deserializer.deserialize_any(FormatVersionVisitor)
283}
284
285/// Compaction type for Iceberg sink
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
287#[serde(rename_all = "kebab-case")]
288pub enum CompactionType {
289    /// Full compaction - rewrites all data files
290    #[default]
291    Full,
292    /// Small files compaction - only compact small files
293    SmallFiles,
294    /// Files with delete compaction - only compact files that have associated delete files
295    FilesWithDelete,
296}
297
298impl CompactionType {
299    pub fn as_str(&self) -> &'static str {
300        match self {
301            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
302            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
303            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
304        }
305    }
306}
307
308impl std::str::FromStr for CompactionType {
309    type Err = SinkError;
310
311    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
312        match s {
313            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
314            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
315            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
316            _ => Err(SinkError::Config(anyhow!(format!(
317                "invalid compaction_type: {}, must be one of: {}, {}, {}",
318                s,
319                ICEBERG_COMPACTION_TYPE_FULL,
320                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
321                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
322            )))),
323        }
324    }
325}
326
327impl TryFrom<&str> for CompactionType {
328    type Error = <Self as std::str::FromStr>::Err;
329
330    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
331        value.parse()
332    }
333}
334
335impl TryFrom<String> for CompactionType {
336    type Error = <Self as std::str::FromStr>::Err;
337
338    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
339        value.as_str().parse()
340    }
341}
342
343impl std::fmt::Display for CompactionType {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        write!(f, "{}", self.as_str())
346    }
347}
348
349#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
351pub struct IcebergConfig {
352    pub r#type: String, // accept "append-only" or "upsert"
353
354    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
355    pub force_append_only: bool,
356
357    #[serde(flatten)]
358    common: IcebergCommon,
359
360    #[serde(flatten)]
361    table: IcebergTableIdentifier,
362
363    #[serde(
364        rename = "primary_key",
365        default,
366        deserialize_with = "deserialize_optional_string_seq_from_string"
367    )]
368    pub primary_key: Option<Vec<String>>,
369
370    // Props for java catalog props.
371    #[serde(skip)]
372    pub java_catalog_props: HashMap<String, String>,
373
374    #[serde(default)]
375    pub partition_by: Option<String>,
376
377    /// Commit every n(>0) checkpoints, default is 60.
378    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
379    #[serde_as(as = "DisplayFromStr")]
380    #[with_option(allow_alter_on_fly)]
381    pub commit_checkpoint_interval: u64,
382
383    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
384    pub create_table_if_not_exists: bool,
385
386    /// Whether it is `exactly_once`, the default is true.
387    #[serde(default = "default_some_true")]
388    #[serde_as(as = "Option<DisplayFromStr>")]
389    pub is_exactly_once: Option<bool>,
390    // Retry commit num when iceberg commit fail. default is 8.
391    // # TODO
392    // Iceberg table may store the retry commit num in table meta.
393    // We should try to find and use that as default commit retry num first.
394    #[serde(default = "default_commit_retry_num")]
395    pub commit_retry_num: u32,
396
397    /// Whether to enable iceberg compaction.
398    #[serde(
399        rename = "enable_compaction",
400        default,
401        deserialize_with = "deserialize_bool_from_string"
402    )]
403    #[with_option(allow_alter_on_fly)]
404    pub enable_compaction: bool,
405
406    /// The interval of iceberg compaction
407    #[serde(rename = "compaction_interval_sec", default)]
408    #[serde_as(as = "Option<DisplayFromStr>")]
409    #[with_option(allow_alter_on_fly)]
410    pub compaction_interval_sec: Option<u64>,
411
412    /// Whether to enable iceberg expired snapshots.
413    #[serde(
414        rename = "enable_snapshot_expiration",
415        default,
416        deserialize_with = "deserialize_bool_from_string"
417    )]
418    #[with_option(allow_alter_on_fly)]
419    pub enable_snapshot_expiration: bool,
420
421    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
422    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
423    pub write_mode: IcebergWriteMode,
424
425    /// Iceberg format version for table creation.
426    #[serde(
427        rename = "format_version",
428        default = "default_iceberg_format_version",
429        deserialize_with = "deserialize_format_version"
430    )]
431    pub format_version: FormatVersion,
432
433    /// The maximum age (in milliseconds) for snapshots before they expire
434    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
435    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
436    #[serde_as(as = "Option<DisplayFromStr>")]
437    #[with_option(allow_alter_on_fly)]
438    pub snapshot_expiration_max_age_millis: Option<i64>,
439
440    /// The number of snapshots to retain
441    #[serde(rename = "snapshot_expiration_retain_last", default)]
442    #[serde_as(as = "Option<DisplayFromStr>")]
443    #[with_option(allow_alter_on_fly)]
444    pub snapshot_expiration_retain_last: Option<i32>,
445
446    #[serde(
447        rename = "snapshot_expiration_clear_expired_files",
448        default = "default_true",
449        deserialize_with = "deserialize_bool_from_string"
450    )]
451    #[with_option(allow_alter_on_fly)]
452    pub snapshot_expiration_clear_expired_files: bool,
453
454    #[serde(
455        rename = "snapshot_expiration_clear_expired_meta_data",
456        default = "default_true",
457        deserialize_with = "deserialize_bool_from_string"
458    )]
459    #[with_option(allow_alter_on_fly)]
460    pub snapshot_expiration_clear_expired_meta_data: bool,
461
462    /// The maximum number of snapshots allowed since the last rewrite operation
463    /// If set, sink will check snapshot count and wait if exceeded
464    #[serde(rename = "compaction.max_snapshots_num", default)]
465    #[serde_as(as = "Option<DisplayFromStr>")]
466    #[with_option(allow_alter_on_fly)]
467    pub max_snapshots_num_before_compaction: Option<usize>,
468
469    #[serde(rename = "compaction.small_files_threshold_mb", default)]
470    #[serde_as(as = "Option<DisplayFromStr>")]
471    #[with_option(allow_alter_on_fly)]
472    pub small_files_threshold_mb: Option<u64>,
473
474    #[serde(rename = "compaction.delete_files_count_threshold", default)]
475    #[serde_as(as = "Option<DisplayFromStr>")]
476    #[with_option(allow_alter_on_fly)]
477    pub delete_files_count_threshold: Option<usize>,
478
479    #[serde(rename = "compaction.trigger_snapshot_count", default)]
480    #[serde_as(as = "Option<DisplayFromStr>")]
481    #[with_option(allow_alter_on_fly)]
482    pub trigger_snapshot_count: Option<usize>,
483
484    #[serde(rename = "compaction.target_file_size_mb", default)]
485    #[serde_as(as = "Option<DisplayFromStr>")]
486    #[with_option(allow_alter_on_fly)]
487    pub target_file_size_mb: Option<u64>,
488
489    /// Compaction type: `full`, `small-files`, or `files-with-delete`
490    /// If not set, will default to `full`
491    #[serde(rename = "compaction.type", default)]
492    #[with_option(allow_alter_on_fly)]
493    pub compaction_type: Option<CompactionType>,
494
495    /// Parquet compression codec
496    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
497    /// Default is snappy
498    #[serde(rename = "compaction.write_parquet_compression", default)]
499    #[with_option(allow_alter_on_fly)]
500    pub write_parquet_compression: Option<String>,
501
502    /// Maximum number of rows in a Parquet row group
503    /// Default is 122880 (from developer config)
504    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
505    #[serde_as(as = "Option<DisplayFromStr>")]
506    #[with_option(allow_alter_on_fly)]
507    pub write_parquet_max_row_group_rows: Option<usize>,
508}
509
510impl EnforceSecret for IcebergConfig {
511    fn enforce_secret<'a>(
512        prop_iter: impl Iterator<Item = &'a str>,
513    ) -> crate::error::ConnectorResult<()> {
514        for prop in prop_iter {
515            IcebergCommon::enforce_one(prop)?;
516        }
517        Ok(())
518    }
519
520    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
521        IcebergCommon::enforce_one(prop)
522    }
523}
524
525impl IcebergConfig {
526    /// Validate that append-only sinks use merge-on-read mode
527    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
528    fn validate_append_only_write_mode(
529        sink_type: &str,
530        write_mode: IcebergWriteMode,
531    ) -> Result<()> {
532        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
533            return Err(SinkError::Config(anyhow!(
534                "'copy-on-write' mode is not supported for append-only iceberg sink. \
535                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
536            )));
537        }
538        Ok(())
539    }
540
541    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
542        let mut config =
543            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
544                .map_err(|e| SinkError::Config(anyhow!(e)))?;
545
546        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
547            return Err(SinkError::Config(anyhow!(
548                "`{}` must be {}, or {}",
549                SINK_TYPE_OPTION,
550                SINK_TYPE_APPEND_ONLY,
551                SINK_TYPE_UPSERT
552            )));
553        }
554
555        if config.r#type == SINK_TYPE_UPSERT {
556            if let Some(primary_key) = &config.primary_key {
557                if primary_key.is_empty() {
558                    return Err(SinkError::Config(anyhow!(
559                        "`primary-key` must not be empty in {}",
560                        SINK_TYPE_UPSERT
561                    )));
562                }
563            } else {
564                return Err(SinkError::Config(anyhow!(
565                    "Must set `primary-key` in {}",
566                    SINK_TYPE_UPSERT
567                )));
568            }
569        }
570
571        // Enforce merge-on-read for append-only sinks
572        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
573
574        // All configs start with "catalog." will be treated as java configs.
575        config.java_catalog_props = values
576            .iter()
577            .filter(|(k, _v)| {
578                k.starts_with("catalog.")
579                    && k != &"catalog.uri"
580                    && k != &"catalog.type"
581                    && k != &"catalog.name"
582                    && k != &"catalog.header"
583            })
584            .map(|(k, v)| (k[8..].to_string(), v.clone()))
585            .collect();
586
587        if config.commit_checkpoint_interval == 0 {
588            return Err(SinkError::Config(anyhow!(
589                "`commit-checkpoint-interval` must be greater than 0"
590            )));
591        }
592
593        Ok(config)
594    }
595
596    pub fn catalog_type(&self) -> &str {
597        self.common.catalog_type()
598    }
599
600    pub async fn load_table(&self) -> Result<Table> {
601        self.common
602            .load_table(&self.table, &self.java_catalog_props)
603            .await
604            .map_err(Into::into)
605    }
606
607    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
608        self.common
609            .create_catalog(&self.java_catalog_props)
610            .await
611            .map_err(Into::into)
612    }
613
614    pub fn full_table_name(&self) -> Result<TableIdent> {
615        self.table.to_table_ident().map_err(Into::into)
616    }
617
618    pub fn catalog_name(&self) -> String {
619        self.common.catalog_name()
620    }
621
622    pub fn table_format_version(&self) -> FormatVersion {
623        self.format_version
624    }
625
626    pub fn compaction_interval_sec(&self) -> u64 {
627        // default to 1 hour
628        self.compaction_interval_sec.unwrap_or(3600)
629    }
630
631    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
632    /// Returns `current_time_ms` - `max_age_millis`
633    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
634        self.snapshot_expiration_max_age_millis
635            .map(|max_age_millis| current_time_ms - max_age_millis)
636    }
637
638    pub fn trigger_snapshot_count(&self) -> usize {
639        self.trigger_snapshot_count.unwrap_or(usize::MAX)
640    }
641
642    pub fn small_files_threshold_mb(&self) -> u64 {
643        self.small_files_threshold_mb.unwrap_or(64)
644    }
645
646    pub fn delete_files_count_threshold(&self) -> usize {
647        self.delete_files_count_threshold.unwrap_or(256)
648    }
649
650    pub fn target_file_size_mb(&self) -> u64 {
651        self.target_file_size_mb.unwrap_or(1024)
652    }
653
654    /// Get the compaction type as an enum
655    /// This method parses the string and returns the enum value
656    pub fn compaction_type(&self) -> CompactionType {
657        self.compaction_type.unwrap_or_default()
658    }
659
660    /// Get the parquet compression codec
661    /// Default is "zstd"
662    pub fn write_parquet_compression(&self) -> &str {
663        self.write_parquet_compression.as_deref().unwrap_or("zstd")
664    }
665
666    /// Get the maximum number of rows in a Parquet row group
667    /// Default is 122880 (from developer config default)
668    pub fn write_parquet_max_row_group_rows(&self) -> usize {
669        self.write_parquet_max_row_group_rows.unwrap_or(122880)
670    }
671
672    /// Parse the compression codec string into Parquet Compression enum
673    /// Returns SNAPPY as default if parsing fails or not specified
674    pub fn get_parquet_compression(&self) -> Compression {
675        parse_parquet_compression(self.write_parquet_compression())
676    }
677}
678
679/// Parse compression codec string to Parquet Compression enum
680fn parse_parquet_compression(codec: &str) -> Compression {
681    match codec.to_lowercase().as_str() {
682        "uncompressed" => Compression::UNCOMPRESSED,
683        "snappy" => Compression::SNAPPY,
684        "gzip" => Compression::GZIP(Default::default()),
685        "lzo" => Compression::LZO,
686        "brotli" => Compression::BROTLI(Default::default()),
687        "lz4" => Compression::LZ4,
688        "zstd" => Compression::ZSTD(Default::default()),
689        _ => {
690            tracing::warn!(
691                "Unknown compression codec '{}', falling back to SNAPPY",
692                codec
693            );
694            Compression::SNAPPY
695        }
696    }
697}
698
699pub struct IcebergSink {
700    pub config: IcebergConfig,
701    param: SinkParam,
702    // In upsert mode, it never be None and empty.
703    unique_column_ids: Option<Vec<usize>>,
704}
705
706impl EnforceSecret for IcebergSink {
707    fn enforce_secret<'a>(
708        prop_iter: impl Iterator<Item = &'a str>,
709    ) -> crate::error::ConnectorResult<()> {
710        for prop in prop_iter {
711            IcebergConfig::enforce_one(prop)?;
712        }
713        Ok(())
714    }
715}
716
717impl TryFrom<SinkParam> for IcebergSink {
718    type Error = SinkError;
719
720    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
721        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
722        IcebergSink::new(config, param)
723    }
724}
725
726impl Debug for IcebergSink {
727    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728        f.debug_struct("IcebergSink")
729            .field("config", &self.config)
730            .finish()
731    }
732}
733
734async fn create_and_validate_table_impl(
735    config: &IcebergConfig,
736    param: &SinkParam,
737) -> Result<Table> {
738    if config.create_table_if_not_exists {
739        create_table_if_not_exists_impl(config, param).await?;
740    }
741
742    let table = config
743        .load_table()
744        .await
745        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
746
747    let sink_schema = param.schema();
748    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
749        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
750
751    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
752        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
753
754    Ok(table)
755}
756
757async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
758    let catalog = config.create_catalog().await?;
759    let namespace = if let Some(database_name) = config.table.database_name() {
760        let namespace = NamespaceIdent::new(database_name.to_owned());
761        if !catalog
762            .namespace_exists(&namespace)
763            .await
764            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
765        {
766            catalog
767                .create_namespace(&namespace, HashMap::default())
768                .await
769                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
770                .context("failed to create iceberg namespace")?;
771        }
772        namespace
773    } else {
774        bail!("database name must be set if you want to create table")
775    };
776
777    let table_id = config
778        .full_table_name()
779        .context("Unable to parse table name")?;
780    if !catalog
781        .table_exists(&table_id)
782        .await
783        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
784    {
785        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
786        // convert risingwave schema -> arrow schema -> iceberg schema
787        let arrow_fields = param
788            .columns
789            .iter()
790            .map(|column| {
791                Ok(iceberg_create_table_arrow_convert
792                    .to_arrow_field(&column.name, &column.data_type)
793                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794                    .context(format!(
795                        "failed to convert {}: {} to arrow type",
796                        &column.name, &column.data_type
797                    ))?)
798            })
799            .collect::<Result<Vec<ArrowField>>>()?;
800        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
801        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
802            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
803            .context("failed to convert arrow schema to iceberg schema")?;
804
805        let location = {
806            let mut names = namespace.clone().inner();
807            names.push(config.table.table_name().to_owned());
808            match &config.common.warehouse_path {
809                Some(warehouse_path) => {
810                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
811                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
812                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
813                    let url = Url::parse(warehouse_path);
814                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
815                        // For rest catalog, the warehouse_path could be a warehouse name.
816                        // In this case, we should specify the location when creating a table.
817                        if config.common.catalog_type() == "rest"
818                            || config.common.catalog_type() == "rest_rust"
819                        {
820                            None
821                        } else {
822                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
823                        }
824                    } else if warehouse_path.ends_with('/') {
825                        Some(format!("{}{}", warehouse_path, names.join("/")))
826                    } else {
827                        Some(format!("{}/{}", warehouse_path, names.join("/")))
828                    }
829                }
830                None => None,
831            }
832        };
833
834        let partition_spec = match &config.partition_by {
835            Some(partition_by) => {
836                let mut partition_fields = Vec::<UnboundPartitionField>::new();
837                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
838                    .into_iter()
839                    .enumerate()
840                {
841                    match iceberg_schema.field_id_by_name(&column) {
842                        Some(id) => partition_fields.push(
843                            UnboundPartitionField::builder()
844                                .source_id(id)
845                                .transform(transform)
846                                .name(format!("_p_{}", column))
847                                .field_id(PARTITION_DATA_ID_START + i as i32)
848                                .build(),
849                        ),
850                        None => bail!(format!(
851                            "Partition source column does not exist in schema: {}",
852                            column
853                        )),
854                    };
855                }
856                Some(
857                    UnboundPartitionSpec::builder()
858                        .with_spec_id(0)
859                        .add_partition_fields(partition_fields)
860                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
861                        .context("failed to add partition columns")?
862                        .build(),
863                )
864            }
865            None => None,
866        };
867
868        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
869        let properties = HashMap::from([(
870            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
871            (config.format_version as u8).to_string(),
872        )]);
873
874        let table_creation_builder = TableCreation::builder()
875            .name(config.table.table_name().to_owned())
876            .schema(iceberg_schema)
877            .format_version(config.table_format_version())
878            .properties(properties);
879
880        let table_creation = match (location, partition_spec) {
881            (Some(location), Some(partition_spec)) => table_creation_builder
882                .location(location)
883                .partition_spec(partition_spec)
884                .build(),
885            (Some(location), None) => table_creation_builder.location(location).build(),
886            (None, Some(partition_spec)) => table_creation_builder
887                .partition_spec(partition_spec)
888                .build(),
889            (None, None) => table_creation_builder.build(),
890        };
891
892        catalog
893            .create_table(&namespace, table_creation)
894            .await
895            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
896            .context("failed to create iceberg table")?;
897    }
898    Ok(())
899}
900
901impl IcebergSink {
902    pub async fn create_and_validate_table(&self) -> Result<Table> {
903        create_and_validate_table_impl(&self.config, &self.param).await
904    }
905
906    pub async fn create_table_if_not_exists(&self) -> Result<()> {
907        create_table_if_not_exists_impl(&self.config, &self.param).await
908    }
909
910    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
911        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
912            if let Some(pk) = &config.primary_key {
913                let mut unique_column_ids = Vec::with_capacity(pk.len());
914                for col_name in pk {
915                    let id = param
916                        .columns
917                        .iter()
918                        .find(|col| col.name.as_str() == col_name)
919                        .ok_or_else(|| {
920                            SinkError::Config(anyhow!(
921                                "Primary key column {} not found in sink schema",
922                                col_name
923                            ))
924                        })?
925                        .column_id
926                        .get_id() as usize;
927                    unique_column_ids.push(id);
928                }
929                Some(unique_column_ids)
930            } else {
931                unreachable!()
932            }
933        } else {
934            None
935        };
936        Ok(Self {
937            config,
938            param,
939            unique_column_ids,
940        })
941    }
942}
943
944impl Sink for IcebergSink {
945    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
946
947    const SINK_NAME: &'static str = ICEBERG_SINK;
948
949    async fn validate(&self) -> Result<()> {
950        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
951            bail!("Snowflake catalog only supports iceberg sources");
952        }
953
954        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
955            risingwave_common::license::Feature::IcebergSinkWithGlue
956                .check_available()
957                .map_err(|e| anyhow::anyhow!(e))?;
958        }
959
960        // Enforce merge-on-read for append-only tables
961        IcebergConfig::validate_append_only_write_mode(
962            &self.config.r#type,
963            self.config.write_mode,
964        )?;
965
966        // Validate compaction type configuration
967        let compaction_type = self.config.compaction_type();
968
969        // Check COW mode constraints
970        // COW mode only supports 'full' compaction type
971        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
972            && compaction_type != CompactionType::Full
973        {
974            bail!(
975                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
976                compaction_type
977            );
978        }
979
980        match compaction_type {
981            CompactionType::SmallFiles => {
982                // 1. check license
983                risingwave_common::license::Feature::IcebergCompaction
984                    .check_available()
985                    .map_err(|e| anyhow::anyhow!(e))?;
986
987                // 2. check write mode
988                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
989                    bail!(
990                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
991                        self.config.write_mode
992                    );
993                }
994
995                // 3. check conflicting parameters
996                if self.config.delete_files_count_threshold.is_some() {
997                    bail!(
998                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
999                    );
1000                }
1001            }
1002            CompactionType::FilesWithDelete => {
1003                // 1. check license
1004                risingwave_common::license::Feature::IcebergCompaction
1005                    .check_available()
1006                    .map_err(|e| anyhow::anyhow!(e))?;
1007
1008                // 2. check write mode
1009                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1010                    bail!(
1011                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1012                        self.config.write_mode
1013                    );
1014                }
1015
1016                // 3. check conflicting parameters
1017                if self.config.small_files_threshold_mb.is_some() {
1018                    bail!(
1019                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1020                    );
1021                }
1022            }
1023            CompactionType::Full => {
1024                // Full compaction has no special requirements
1025            }
1026        }
1027
1028        let _ = self.create_and_validate_table().await?;
1029        Ok(())
1030    }
1031
1032    fn support_schema_change() -> bool {
1033        true
1034    }
1035
1036    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1037        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1038
1039        // Validate compaction interval
1040        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1041            if iceberg_config.enable_compaction && compaction_interval == 0 {
1042                bail!(
1043                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1044                );
1045            }
1046
1047            // log compaction_interval
1048            tracing::info!(
1049                "Alter config compaction_interval set to {} seconds",
1050                compaction_interval
1051            );
1052        }
1053
1054        // Validate max snapshots
1055        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1056            && max_snapshots < 1
1057        {
1058            bail!(
1059                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1060                max_snapshots
1061            );
1062        }
1063
1064        // Validate target file size
1065        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1066            && target_file_size_mb == 0
1067        {
1068            bail!("`compaction.target_file_size_mb` must be greater than 0");
1069        }
1070
1071        // Validate parquet max row group rows
1072        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1073            && max_row_group_rows == 0
1074        {
1075            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1076        }
1077
1078        // Validate parquet compression codec
1079        if let Some(ref compression) = iceberg_config.write_parquet_compression {
1080            let valid_codecs = [
1081                "uncompressed",
1082                "snappy",
1083                "gzip",
1084                "lzo",
1085                "brotli",
1086                "lz4",
1087                "zstd",
1088            ];
1089            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1090                bail!(
1091                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1092                    valid_codecs,
1093                    compression
1094                );
1095            }
1096        }
1097
1098        Ok(())
1099    }
1100
1101    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1102        let writer = IcebergSinkWriter::new(
1103            self.config.clone(),
1104            self.param.clone(),
1105            writer_param.clone(),
1106            self.unique_column_ids.clone(),
1107        );
1108
1109        let commit_checkpoint_interval =
1110            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1111                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1112            );
1113        let log_sinker = CoordinatedLogSinker::new(
1114            &writer_param,
1115            self.param.clone(),
1116            writer,
1117            commit_checkpoint_interval,
1118        )
1119        .await?;
1120
1121        Ok(log_sinker)
1122    }
1123
1124    fn is_coordinated_sink(&self) -> bool {
1125        true
1126    }
1127
1128    async fn new_coordinator(
1129        &self,
1130        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1131    ) -> Result<SinkCommitCoordinator> {
1132        let catalog = self.config.create_catalog().await?;
1133        let table = self.create_and_validate_table().await?;
1134        let coordinator = IcebergSinkCommitter {
1135            catalog,
1136            table,
1137            last_commit_epoch: 0,
1138            sink_id: self.param.sink_id,
1139            config: self.config.clone(),
1140            param: self.param.clone(),
1141            commit_retry_num: self.config.commit_retry_num,
1142            iceberg_compact_stat_sender,
1143        };
1144        if self.config.is_exactly_once.unwrap_or_default() {
1145            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1146        } else {
1147            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1148        }
1149    }
1150}
1151
1152/// None means no project.
1153/// Prepare represent the extra partition column idx.
1154/// Done represents the project idx vec.
1155///
1156/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
1157/// to create the project idx vec.
1158enum ProjectIdxVec {
1159    None,
1160    Prepare(usize),
1161    Done(Vec<usize>),
1162}
1163
1164type DataFileWriterBuilderType =
1165    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1166type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1167    ParquetWriterBuilder,
1168    DefaultLocationGenerator,
1169    DefaultFileNameGenerator,
1170>;
1171type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1172    ParquetWriterBuilder,
1173    DefaultLocationGenerator,
1174    DefaultFileNameGenerator,
1175>;
1176type DeletionVectorWriterBuilderType =
1177    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1178type DeletionVectorWriterType =
1179    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1180type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1181    ParquetWriterBuilder,
1182    DefaultLocationGenerator,
1183    DefaultFileNameGenerator,
1184>;
1185
1186#[derive(Clone)]
1187enum PositionDeleteWriterBuilderType {
1188    PositionDelete(PositionDeleteFileWriterBuilderType),
1189    DeletionVector(DeletionVectorWriterBuilderType),
1190}
1191
1192enum PositionDeleteWriterType {
1193    PositionDelete(PositionDeleteFileWriterType),
1194    DeletionVector(DeletionVectorWriterType),
1195}
1196
1197#[async_trait]
1198impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1199    type R = PositionDeleteWriterType;
1200
1201    async fn build(
1202        self,
1203        partition_key: Option<iceberg::spec::PartitionKey>,
1204    ) -> iceberg::Result<Self::R> {
1205        match self {
1206            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1207                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1208            ),
1209            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1210                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1211            ),
1212        }
1213    }
1214}
1215
1216#[async_trait]
1217impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1218    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1219        match self {
1220            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1221            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1222        }
1223    }
1224
1225    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1226        match self {
1227            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1228            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1229        }
1230    }
1231}
1232
1233#[derive(Clone)]
1234struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1235    inner: B,
1236    fanout_enabled: bool,
1237    schema: IcebergSchemaRef,
1238    partition_spec: PartitionSpecRef,
1239    compute_partition: bool,
1240}
1241
1242impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1243    fn new(
1244        inner: B,
1245        fanout_enabled: bool,
1246        schema: IcebergSchemaRef,
1247        partition_spec: PartitionSpecRef,
1248        compute_partition: bool,
1249    ) -> Self {
1250        Self {
1251            inner,
1252            fanout_enabled,
1253            schema,
1254            partition_spec,
1255            compute_partition,
1256        }
1257    }
1258
1259    fn build(self) -> iceberg::Result<TaskWriter<B>> {
1260        let partition_splitter = match (
1261            self.partition_spec.is_unpartitioned(),
1262            self.compute_partition,
1263        ) {
1264            (true, _) => None,
1265            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
1266                self.schema.clone(),
1267                self.partition_spec.clone(),
1268            )?),
1269            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1270                self.schema.clone(),
1271                self.partition_spec.clone(),
1272            )?),
1273        };
1274
1275        Ok(TaskWriter::new_with_partition_splitter(
1276            self.inner,
1277            self.fanout_enabled,
1278            self.schema,
1279            self.partition_spec,
1280            partition_splitter,
1281        ))
1282    }
1283}
1284
1285pub enum IcebergSinkWriter {
1286    Created(IcebergSinkWriterArgs),
1287    Initialized(IcebergSinkWriterInner),
1288}
1289
1290pub struct IcebergSinkWriterArgs {
1291    config: IcebergConfig,
1292    sink_param: SinkParam,
1293    writer_param: SinkWriterParam,
1294    unique_column_ids: Option<Vec<usize>>,
1295}
1296
1297pub struct IcebergSinkWriterInner {
1298    writer: IcebergWriterDispatch,
1299    arrow_schema: SchemaRef,
1300    // See comments below
1301    metrics: IcebergWriterMetrics,
1302    // State of iceberg table for this writer
1303    table: Table,
1304    // For chunk with extra partition column, we should remove this column before write.
1305    // This project index vec is used to avoid create project idx each time.
1306    project_idx_vec: ProjectIdxVec,
1307}
1308
1309#[allow(clippy::type_complexity)]
1310enum IcebergWriterDispatch {
1311    Append {
1312        writer: Option<Box<dyn IcebergWriter>>,
1313        writer_builder:
1314            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1315    },
1316    Upsert {
1317        writer: Option<Box<dyn IcebergWriter>>,
1318        writer_builder: TaskWriterBuilderWrapper<
1319            MonitoredGeneralWriterBuilder<
1320                DeltaWriterBuilder<
1321                    DataFileWriterBuilderType,
1322                    PositionDeleteWriterBuilderType,
1323                    EqualityDeleteFileWriterBuilderType,
1324                >,
1325            >,
1326        >,
1327        arrow_schema_with_op_column: SchemaRef,
1328    },
1329}
1330
1331impl IcebergWriterDispatch {
1332    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1333        match self {
1334            IcebergWriterDispatch::Append { writer, .. }
1335            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1336        }
1337    }
1338}
1339
1340pub struct IcebergWriterMetrics {
1341    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1342    // They are actually used in `PrometheusWriterBuilder`:
1343    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1344    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1345    _write_qps: LabelGuardedIntCounter,
1346    _write_latency: LabelGuardedHistogram,
1347    write_bytes: LabelGuardedIntCounter,
1348}
1349
1350impl IcebergSinkWriter {
1351    pub fn new(
1352        config: IcebergConfig,
1353        sink_param: SinkParam,
1354        writer_param: SinkWriterParam,
1355        unique_column_ids: Option<Vec<usize>>,
1356    ) -> Self {
1357        Self::Created(IcebergSinkWriterArgs {
1358            config,
1359            sink_param,
1360            writer_param,
1361            unique_column_ids,
1362        })
1363    }
1364}
1365
1366impl IcebergSinkWriterInner {
1367    fn build_append_only(
1368        config: &IcebergConfig,
1369        table: Table,
1370        writer_param: &SinkWriterParam,
1371    ) -> Result<Self> {
1372        let SinkWriterParam {
1373            extra_partition_col_idx,
1374            actor_id,
1375            sink_id,
1376            sink_name,
1377            ..
1378        } = writer_param;
1379        let metrics_labels = [
1380            &actor_id.to_string(),
1381            &sink_id.to_string(),
1382            sink_name.as_str(),
1383        ];
1384
1385        // Metrics
1386        let write_qps = GLOBAL_SINK_METRICS
1387            .iceberg_write_qps
1388            .with_guarded_label_values(&metrics_labels);
1389        let write_latency = GLOBAL_SINK_METRICS
1390            .iceberg_write_latency
1391            .with_guarded_label_values(&metrics_labels);
1392        // # TODO
1393        // Unused. Add this metrics later.
1394        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1395            .iceberg_rolling_unflushed_data_file
1396            .with_guarded_label_values(&metrics_labels);
1397        let write_bytes = GLOBAL_SINK_METRICS
1398            .iceberg_write_bytes
1399            .with_guarded_label_values(&metrics_labels);
1400
1401        let schema = table.metadata().current_schema();
1402        let partition_spec = table.metadata().default_partition_spec();
1403        let fanout_enabled = !partition_spec.fields().is_empty();
1404        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1405        let unique_uuid_suffix = Uuid::now_v7();
1406
1407        let parquet_writer_properties = WriterProperties::builder()
1408            .set_compression(config.get_parquet_compression())
1409            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1410            .set_created_by(PARQUET_CREATED_BY.to_owned())
1411            .build();
1412
1413        let parquet_writer_builder =
1414            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1415        let rolling_builder = RollingFileWriterBuilder::new(
1416            parquet_writer_builder,
1417            (config.target_file_size_mb() * 1024 * 1024) as usize,
1418            table.file_io().clone(),
1419            DefaultLocationGenerator::new(table.metadata().clone())
1420                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1421            DefaultFileNameGenerator::new(
1422                writer_param.actor_id.to_string(),
1423                Some(unique_uuid_suffix.to_string()),
1424                iceberg::spec::DataFileFormat::Parquet,
1425            ),
1426        );
1427        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1428        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1429            data_file_builder,
1430            write_qps.clone(),
1431            write_latency.clone(),
1432        );
1433        let writer_builder = TaskWriterBuilderWrapper::new(
1434            monitored_builder,
1435            fanout_enabled,
1436            schema.clone(),
1437            partition_spec.clone(),
1438            true,
1439        );
1440        let inner_writer = Some(Box::new(
1441            writer_builder
1442                .clone()
1443                .build()
1444                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1445        ) as Box<dyn IcebergWriter>);
1446        Ok(Self {
1447            arrow_schema: Arc::new(
1448                schema_to_arrow_schema(table.metadata().current_schema())
1449                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1450            ),
1451            metrics: IcebergWriterMetrics {
1452                _write_qps: write_qps,
1453                _write_latency: write_latency,
1454                write_bytes,
1455            },
1456            writer: IcebergWriterDispatch::Append {
1457                writer: inner_writer,
1458                writer_builder,
1459            },
1460            table,
1461            project_idx_vec: {
1462                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1463                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1464                } else {
1465                    ProjectIdxVec::None
1466                }
1467            },
1468        })
1469    }
1470
1471    fn build_upsert(
1472        config: &IcebergConfig,
1473        table: Table,
1474        unique_column_ids: Vec<usize>,
1475        writer_param: &SinkWriterParam,
1476    ) -> Result<Self> {
1477        let SinkWriterParam {
1478            extra_partition_col_idx,
1479            actor_id,
1480            sink_id,
1481            sink_name,
1482            ..
1483        } = writer_param;
1484        let metrics_labels = [
1485            &actor_id.to_string(),
1486            &sink_id.to_string(),
1487            sink_name.as_str(),
1488        ];
1489        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1490
1491        // Metrics
1492        let write_qps = GLOBAL_SINK_METRICS
1493            .iceberg_write_qps
1494            .with_guarded_label_values(&metrics_labels);
1495        let write_latency = GLOBAL_SINK_METRICS
1496            .iceberg_write_latency
1497            .with_guarded_label_values(&metrics_labels);
1498        // # TODO
1499        // Unused. Add this metrics later.
1500        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1501            .iceberg_rolling_unflushed_data_file
1502            .with_guarded_label_values(&metrics_labels);
1503        let write_bytes = GLOBAL_SINK_METRICS
1504            .iceberg_write_bytes
1505            .with_guarded_label_values(&metrics_labels);
1506
1507        // Determine the schema id and partition spec id
1508        let schema = table.metadata().current_schema();
1509        let partition_spec = table.metadata().default_partition_spec();
1510        let fanout_enabled = !partition_spec.fields().is_empty();
1511        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1512
1513        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1514        let unique_uuid_suffix = Uuid::now_v7();
1515
1516        let parquet_writer_properties = WriterProperties::builder()
1517            .set_compression(config.get_parquet_compression())
1518            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1519            .set_created_by(PARQUET_CREATED_BY.to_owned())
1520            .build();
1521
1522        let data_file_builder = {
1523            let parquet_writer_builder =
1524                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1525            let rolling_writer_builder = RollingFileWriterBuilder::new(
1526                parquet_writer_builder,
1527                (config.target_file_size_mb() * 1024 * 1024) as usize,
1528                table.file_io().clone(),
1529                DefaultLocationGenerator::new(table.metadata().clone())
1530                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1531                DefaultFileNameGenerator::new(
1532                    writer_param.actor_id.to_string(),
1533                    Some(unique_uuid_suffix.to_string()),
1534                    iceberg::spec::DataFileFormat::Parquet,
1535                ),
1536            );
1537            DataFileWriterBuilder::new(rolling_writer_builder)
1538        };
1539        let position_delete_builder = if use_deletion_vectors {
1540            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1541                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1542            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1543                table.file_io().clone(),
1544                location_generator,
1545                DefaultFileNameGenerator::new(
1546                    writer_param.actor_id.to_string(),
1547                    Some(format!("delvec-{}", unique_uuid_suffix)),
1548                    iceberg::spec::DataFileFormat::Puffin,
1549                ),
1550            ))
1551        } else {
1552            let parquet_writer_builder = ParquetWriterBuilder::new(
1553                parquet_writer_properties.clone(),
1554                POSITION_DELETE_SCHEMA.clone().into(),
1555            );
1556            let rolling_writer_builder = RollingFileWriterBuilder::new(
1557                parquet_writer_builder,
1558                (config.target_file_size_mb() * 1024 * 1024) as usize,
1559                table.file_io().clone(),
1560                DefaultLocationGenerator::new(table.metadata().clone())
1561                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1562                DefaultFileNameGenerator::new(
1563                    writer_param.actor_id.to_string(),
1564                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1565                    iceberg::spec::DataFileFormat::Parquet,
1566                ),
1567            );
1568            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1569                rolling_writer_builder,
1570            ))
1571        };
1572        let equality_delete_builder = {
1573            let eq_del_config = EqualityDeleteWriterConfig::new(
1574                unique_column_ids.clone(),
1575                table.metadata().current_schema().clone(),
1576            )
1577            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1578            let parquet_writer_builder = ParquetWriterBuilder::new(
1579                parquet_writer_properties,
1580                Arc::new(
1581                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1582                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1583                ),
1584            );
1585            let rolling_writer_builder = RollingFileWriterBuilder::new(
1586                parquet_writer_builder,
1587                (config.target_file_size_mb() * 1024 * 1024) as usize,
1588                table.file_io().clone(),
1589                DefaultLocationGenerator::new(table.metadata().clone())
1590                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1591                DefaultFileNameGenerator::new(
1592                    writer_param.actor_id.to_string(),
1593                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1594                    iceberg::spec::DataFileFormat::Parquet,
1595                ),
1596            );
1597
1598            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1599        };
1600        let delta_builder = DeltaWriterBuilder::new(
1601            data_file_builder,
1602            position_delete_builder,
1603            equality_delete_builder,
1604            unique_column_ids,
1605            schema.clone(),
1606        );
1607        let original_arrow_schema = Arc::new(
1608            schema_to_arrow_schema(table.metadata().current_schema())
1609                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1610        );
1611        let schema_with_extra_op_column = {
1612            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1613            new_fields.push(Arc::new(ArrowField::new(
1614                "op".to_owned(),
1615                ArrowDataType::Int32,
1616                false,
1617            )));
1618            Arc::new(ArrowSchema::new(new_fields))
1619        };
1620        let writer_builder = TaskWriterBuilderWrapper::new(
1621            MonitoredGeneralWriterBuilder::new(
1622                delta_builder,
1623                write_qps.clone(),
1624                write_latency.clone(),
1625            ),
1626            fanout_enabled,
1627            schema.clone(),
1628            partition_spec.clone(),
1629            true,
1630        );
1631        let inner_writer = Some(Box::new(
1632            writer_builder
1633                .clone()
1634                .build()
1635                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1636        ) as Box<dyn IcebergWriter>);
1637        Ok(Self {
1638            arrow_schema: original_arrow_schema,
1639            metrics: IcebergWriterMetrics {
1640                _write_qps: write_qps,
1641                _write_latency: write_latency,
1642                write_bytes,
1643            },
1644            table,
1645            writer: IcebergWriterDispatch::Upsert {
1646                writer: inner_writer,
1647                writer_builder,
1648                arrow_schema_with_op_column: schema_with_extra_op_column,
1649            },
1650            project_idx_vec: {
1651                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1652                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1653                } else {
1654                    ProjectIdxVec::None
1655                }
1656            },
1657        })
1658    }
1659}
1660
1661#[async_trait]
1662impl SinkWriter for IcebergSinkWriter {
1663    type CommitMetadata = Option<SinkMetadata>;
1664
1665    /// Begin a new epoch
1666    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1667        let Self::Created(args) = self else {
1668            return Ok(());
1669        };
1670
1671        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1672        let inner = match &args.unique_column_ids {
1673            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1674                &args.config,
1675                table,
1676                unique_column_ids.clone(),
1677                &args.writer_param,
1678            )?,
1679            None => {
1680                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1681            }
1682        };
1683
1684        *self = IcebergSinkWriter::Initialized(inner);
1685        Ok(())
1686    }
1687
1688    /// Write a stream chunk to sink
1689    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1690        let Self::Initialized(inner) = self else {
1691            unreachable!("IcebergSinkWriter should be initialized before barrier");
1692        };
1693
1694        // Try to build writer if it's None.
1695        match &mut inner.writer {
1696            IcebergWriterDispatch::Append {
1697                writer,
1698                writer_builder,
1699            } => {
1700                if writer.is_none() {
1701                    *writer = Some(Box::new(
1702                        writer_builder
1703                            .clone()
1704                            .build()
1705                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1706                    ));
1707                }
1708            }
1709            IcebergWriterDispatch::Upsert {
1710                writer,
1711                writer_builder,
1712                ..
1713            } => {
1714                if writer.is_none() {
1715                    *writer = Some(Box::new(
1716                        writer_builder
1717                            .clone()
1718                            .build()
1719                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1720                    ));
1721                }
1722            }
1723        };
1724
1725        // Process the chunk.
1726        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1727        match &mut inner.project_idx_vec {
1728            ProjectIdxVec::None => {}
1729            ProjectIdxVec::Prepare(idx) => {
1730                if *idx >= chunk.columns().len() {
1731                    return Err(SinkError::Iceberg(anyhow!(
1732                        "invalid extra partition column index {}",
1733                        idx
1734                    )));
1735                }
1736                let project_idx_vec = (0..*idx)
1737                    .chain(*idx + 1..chunk.columns().len())
1738                    .collect_vec();
1739                chunk = chunk.project(&project_idx_vec);
1740                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1741            }
1742            ProjectIdxVec::Done(idx_vec) => {
1743                chunk = chunk.project(idx_vec);
1744            }
1745        }
1746        if ops.is_empty() {
1747            return Ok(());
1748        }
1749        let write_batch_size = chunk.estimated_heap_size();
1750        let batch = match &inner.writer {
1751            IcebergWriterDispatch::Append { .. } => {
1752                // separate out insert chunk
1753                let filters =
1754                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1755                chunk.set_visibility(filters);
1756                IcebergArrowConvert
1757                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1758                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1759            }
1760            IcebergWriterDispatch::Upsert {
1761                arrow_schema_with_op_column,
1762                ..
1763            } => {
1764                let chunk = IcebergArrowConvert
1765                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1766                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1767                let ops = Arc::new(Int32Array::from(
1768                    ops.iter()
1769                        .map(|op| match op {
1770                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1771                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1772                        })
1773                        .collect_vec(),
1774                ));
1775                let mut columns = chunk.columns().to_vec();
1776                columns.push(ops);
1777                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1778                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1779            }
1780        };
1781
1782        let writer = inner.writer.get_writer().unwrap();
1783        writer
1784            .write(batch)
1785            .instrument_await("iceberg_write")
1786            .await
1787            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1788        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1789        Ok(())
1790    }
1791
1792    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1793    /// writer should commit the current epoch.
1794    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1795        let Self::Initialized(inner) = self else {
1796            unreachable!("IcebergSinkWriter should be initialized before barrier");
1797        };
1798
1799        // Skip it if not checkpoint
1800        if !is_checkpoint {
1801            return Ok(None);
1802        }
1803
1804        let close_result = match &mut inner.writer {
1805            IcebergWriterDispatch::Append {
1806                writer,
1807                writer_builder,
1808            } => {
1809                let close_result = match writer.take() {
1810                    Some(mut writer) => {
1811                        Some(writer.close().instrument_await("iceberg_close").await)
1812                    }
1813                    _ => None,
1814                };
1815                match writer_builder.clone().build() {
1816                    Ok(new_writer) => {
1817                        *writer = Some(Box::new(new_writer));
1818                    }
1819                    _ => {
1820                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1821                        // here because current writer may close successfully. So we just log the error.
1822                        warn!("Failed to build new writer after close");
1823                    }
1824                }
1825                close_result
1826            }
1827            IcebergWriterDispatch::Upsert {
1828                writer,
1829                writer_builder,
1830                ..
1831            } => {
1832                let close_result = match writer.take() {
1833                    Some(mut writer) => {
1834                        Some(writer.close().instrument_await("iceberg_close").await)
1835                    }
1836                    _ => None,
1837                };
1838                match writer_builder.clone().build() {
1839                    Ok(new_writer) => {
1840                        *writer = Some(Box::new(new_writer));
1841                    }
1842                    _ => {
1843                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1844                        // here because current writer may close successfully. So we just log the error.
1845                        warn!("Failed to build new writer after close");
1846                    }
1847                }
1848                close_result
1849            }
1850        };
1851
1852        match close_result {
1853            Some(Ok(result)) => {
1854                let format_version = inner.table.metadata().format_version();
1855                let partition_type = inner.table.metadata().default_partition_type();
1856                let data_files = result
1857                    .into_iter()
1858                    .map(|f| {
1859                        // Truncate large column statistics BEFORE serialization
1860                        let truncated = truncate_datafile(f);
1861                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1862                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1863                    })
1864                    .collect::<Result<Vec<_>>>()?;
1865                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1866                    data_files,
1867                    schema_id: inner.table.metadata().current_schema_id(),
1868                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1869                })?))
1870            }
1871            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1872            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1873        }
1874    }
1875}
1876
1877const SCHEMA_ID: &str = "schema_id";
1878const PARTITION_SPEC_ID: &str = "partition_spec_id";
1879const DATA_FILES: &str = "data_files";
1880
1881/// Maximum size for column statistics (min/max values) in bytes.
1882/// Column statistics larger than this will be truncated to avoid metadata bloat.
1883/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1884///
1885/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1886/// We truncate at the `DataFile` level (before serialization) by directly modifying
1887/// the public `lower_bounds` and `upper_bounds` fields.
1888///
1889/// This prevents metadata from ballooning to gigabytes when dealing with large
1890/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1891/// that benefit from query optimization.
1892const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1893
1894/// Truncate large column statistics from `DataFile` BEFORE serialization.
1895///
1896/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1897/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1898///
1899/// # Arguments
1900/// * `data_file` - A `DataFile` to process
1901///
1902/// # Returns
1903/// The modified `DataFile` with large statistics truncated
1904fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1905    // Process lower_bounds - remove entries with large values
1906    data_file.lower_bounds.retain(|field_id, datum| {
1907        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1908        let size = match datum.to_bytes() {
1909            Ok(bytes) => bytes.len(),
1910            Err(_) => 0,
1911        };
1912
1913        if size > MAX_COLUMN_STAT_SIZE {
1914            tracing::debug!(
1915                field_id = field_id,
1916                size = size,
1917                "Truncating large lower_bound statistic"
1918            );
1919            return false;
1920        }
1921        true
1922    });
1923
1924    // Process upper_bounds - remove entries with large values
1925    data_file.upper_bounds.retain(|field_id, datum| {
1926        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1927        let size = match datum.to_bytes() {
1928            Ok(bytes) => bytes.len(),
1929            Err(_) => 0,
1930        };
1931
1932        if size > MAX_COLUMN_STAT_SIZE {
1933            tracing::debug!(
1934                field_id = field_id,
1935                size = size,
1936                "Truncating large upper_bound statistic"
1937            );
1938            return false;
1939        }
1940        true
1941    });
1942
1943    data_file
1944}
1945
1946#[derive(Default, Clone)]
1947struct IcebergCommitResult {
1948    schema_id: i32,
1949    partition_spec_id: i32,
1950    data_files: Vec<SerializedDataFile>,
1951}
1952
1953impl IcebergCommitResult {
1954    fn try_from(value: &SinkMetadata) -> Result<Self> {
1955        if let Some(Serialized(v)) = &value.metadata {
1956            let mut values = if let serde_json::Value::Object(v) =
1957                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1958                    .context("Can't parse iceberg sink metadata")?
1959            {
1960                v
1961            } else {
1962                bail!("iceberg sink metadata should be an object");
1963            };
1964
1965            let schema_id;
1966            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1967                schema_id = value
1968                    .as_u64()
1969                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1970            } else {
1971                bail!("iceberg sink metadata should have schema_id");
1972            }
1973
1974            let partition_spec_id;
1975            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1976                partition_spec_id = value
1977                    .as_u64()
1978                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1979            } else {
1980                bail!("iceberg sink metadata should have partition_spec_id");
1981            }
1982
1983            let data_files: Vec<SerializedDataFile>;
1984            if let serde_json::Value::Array(values) = values
1985                .remove(DATA_FILES)
1986                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1987            {
1988                data_files = values
1989                    .into_iter()
1990                    .map(from_value::<SerializedDataFile>)
1991                    .collect::<std::result::Result<_, _>>()
1992                    .unwrap();
1993            } else {
1994                bail!("iceberg sink metadata should have data_files object");
1995            }
1996
1997            Ok(Self {
1998                schema_id: schema_id as i32,
1999                partition_spec_id: partition_spec_id as i32,
2000                data_files,
2001            })
2002        } else {
2003            bail!("Can't create iceberg sink write result from empty data!")
2004        }
2005    }
2006
2007    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2008        let mut values = if let serde_json::Value::Object(value) =
2009            serde_json::from_slice::<serde_json::Value>(&value)
2010                .context("Can't parse iceberg sink metadata")?
2011        {
2012            value
2013        } else {
2014            bail!("iceberg sink metadata should be an object");
2015        };
2016
2017        let schema_id;
2018        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2019            schema_id = value
2020                .as_u64()
2021                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2022        } else {
2023            bail!("iceberg sink metadata should have schema_id");
2024        }
2025
2026        let partition_spec_id;
2027        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2028            partition_spec_id = value
2029                .as_u64()
2030                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2031        } else {
2032            bail!("iceberg sink metadata should have partition_spec_id");
2033        }
2034
2035        let data_files: Vec<SerializedDataFile>;
2036        if let serde_json::Value::Array(values) = values
2037            .remove(DATA_FILES)
2038            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2039        {
2040            data_files = values
2041                .into_iter()
2042                .map(from_value::<SerializedDataFile>)
2043                .collect::<std::result::Result<_, _>>()
2044                .unwrap();
2045        } else {
2046            bail!("iceberg sink metadata should have data_files object");
2047        }
2048
2049        Ok(Self {
2050            schema_id: schema_id as i32,
2051            partition_spec_id: partition_spec_id as i32,
2052            data_files,
2053        })
2054    }
2055}
2056
2057impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2058    type Error = SinkError;
2059
2060    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2061        let json_data_files = serde_json::Value::Array(
2062            value
2063                .data_files
2064                .iter()
2065                .map(serde_json::to_value)
2066                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2067                .context("Can't serialize data files to json")?,
2068        );
2069        let json_value = serde_json::Value::Object(
2070            vec![
2071                (
2072                    SCHEMA_ID.to_owned(),
2073                    serde_json::Value::Number(value.schema_id.into()),
2074                ),
2075                (
2076                    PARTITION_SPEC_ID.to_owned(),
2077                    serde_json::Value::Number(value.partition_spec_id.into()),
2078                ),
2079                (DATA_FILES.to_owned(), json_data_files),
2080            ]
2081            .into_iter()
2082            .collect(),
2083        );
2084        Ok(SinkMetadata {
2085            metadata: Some(Serialized(SerializedMetadata {
2086                metadata: serde_json::to_vec(&json_value)
2087                    .context("Can't serialize iceberg sink metadata")?,
2088            })),
2089        })
2090    }
2091}
2092
2093impl TryFrom<IcebergCommitResult> for Vec<u8> {
2094    type Error = SinkError;
2095
2096    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2097        let json_data_files = serde_json::Value::Array(
2098            value
2099                .data_files
2100                .iter()
2101                .map(serde_json::to_value)
2102                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2103                .context("Can't serialize data files to json")?,
2104        );
2105        let json_value = serde_json::Value::Object(
2106            vec![
2107                (
2108                    SCHEMA_ID.to_owned(),
2109                    serde_json::Value::Number(value.schema_id.into()),
2110                ),
2111                (
2112                    PARTITION_SPEC_ID.to_owned(),
2113                    serde_json::Value::Number(value.partition_spec_id.into()),
2114                ),
2115                (DATA_FILES.to_owned(), json_data_files),
2116            ]
2117            .into_iter()
2118            .collect(),
2119        );
2120        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2121    }
2122}
2123pub struct IcebergSinkCommitter {
2124    catalog: Arc<dyn Catalog>,
2125    table: Table,
2126    pub last_commit_epoch: u64,
2127    pub(crate) sink_id: SinkId,
2128    pub(crate) config: IcebergConfig,
2129    pub(crate) param: SinkParam,
2130    commit_retry_num: u32,
2131    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2132}
2133
2134impl IcebergSinkCommitter {
2135    // Reload table and guarantee current schema_id and partition_spec_id matches
2136    // given `schema_id` and `partition_spec_id`
2137    async fn reload_table(
2138        catalog: &dyn Catalog,
2139        table_ident: &TableIdent,
2140        schema_id: i32,
2141        partition_spec_id: i32,
2142    ) -> Result<Table> {
2143        let table = catalog
2144            .load_table(table_ident)
2145            .await
2146            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2147        if table.metadata().current_schema_id() != schema_id {
2148            return Err(SinkError::Iceberg(anyhow!(
2149                "Schema evolution not supported, expect schema id {}, but got {}",
2150                schema_id,
2151                table.metadata().current_schema_id()
2152            )));
2153        }
2154        if table.metadata().default_partition_spec_id() != partition_spec_id {
2155            return Err(SinkError::Iceberg(anyhow!(
2156                "Partition evolution not supported, expect partition spec id {}, but got {}",
2157                partition_spec_id,
2158                table.metadata().default_partition_spec_id()
2159            )));
2160        }
2161        Ok(table)
2162    }
2163}
2164
2165#[async_trait]
2166impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2167    async fn init(&mut self) -> Result<()> {
2168        tracing::info!(
2169            sink_id = %self.param.sink_id,
2170            "Iceberg sink coordinator initialized",
2171        );
2172
2173        Ok(())
2174    }
2175
2176    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2177        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2178
2179        if metadata.is_empty() {
2180            tracing::debug!(?epoch, "No datafile to commit");
2181            return Ok(());
2182        }
2183
2184        // Commit data if present
2185        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2186            self.commit_data_impl(epoch, write_results, snapshot_id)
2187                .await?;
2188        }
2189
2190        Ok(())
2191    }
2192
2193    async fn commit_schema_change(
2194        &mut self,
2195        epoch: u64,
2196        schema_change: PbSinkSchemaChange,
2197    ) -> Result<()> {
2198        tracing::info!(
2199            "Committing schema change {:?} in epoch {}",
2200            schema_change,
2201            epoch
2202        );
2203        self.commit_schema_change_impl(schema_change).await?;
2204        tracing::info!("Successfully committed schema change in epoch {}", epoch);
2205
2206        Ok(())
2207    }
2208}
2209
2210#[async_trait]
2211impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2212    async fn init(&mut self) -> Result<()> {
2213        tracing::info!(
2214            sink_id = %self.param.sink_id,
2215            "Iceberg sink coordinator initialized",
2216        );
2217
2218        Ok(())
2219    }
2220
2221    async fn pre_commit(
2222        &mut self,
2223        epoch: u64,
2224        metadata: Vec<SinkMetadata>,
2225        _schema_change: Option<PbSinkSchemaChange>,
2226    ) -> Result<Option<Vec<u8>>> {
2227        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2228
2229        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2230            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2231            None => {
2232                tracing::debug!(?epoch, "no data to pre commit");
2233                return Ok(None);
2234            }
2235        };
2236
2237        let mut write_results_bytes = Vec::new();
2238        for each_parallelism_write_result in write_results {
2239            let each_parallelism_write_result_bytes: Vec<u8> =
2240                each_parallelism_write_result.try_into()?;
2241            write_results_bytes.push(each_parallelism_write_result_bytes);
2242        }
2243
2244        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2245        write_results_bytes.push(snapshot_id_bytes);
2246
2247        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2248        Ok(Some(pre_commit_metadata_bytes))
2249    }
2250
2251    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2252        tracing::debug!("Starting iceberg commit in epoch {epoch}");
2253
2254        if commit_metadata.is_empty() {
2255            tracing::debug!(?epoch, "No datafile to commit");
2256            return Ok(());
2257        }
2258
2259        // Deserialize commit metadata
2260        let mut payload = deserialize_metadata(commit_metadata);
2261        if payload.is_empty() {
2262            return Err(SinkError::Iceberg(anyhow!(
2263                "Invalid commit metadata: empty payload"
2264            )));
2265        }
2266
2267        // Last element is snapshot_id
2268        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2269            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2270        })?;
2271        let snapshot_id = i64::from_le_bytes(
2272            snapshot_id_bytes
2273                .try_into()
2274                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2275        );
2276
2277        // Remaining elements are write_results
2278        let write_results = payload
2279            .into_iter()
2280            .map(IcebergCommitResult::try_from_serialized_bytes)
2281            .collect::<Result<Vec<_>>>()?;
2282
2283        let snapshot_committed = self
2284            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2285            .await?;
2286
2287        if snapshot_committed {
2288            tracing::info!(
2289                "Snapshot id {} already committed in iceberg table, skip committing again.",
2290                snapshot_id
2291            );
2292            return Ok(());
2293        }
2294
2295        self.commit_data_impl(epoch, write_results, snapshot_id)
2296            .await
2297    }
2298
2299    async fn commit_schema_change(
2300        &mut self,
2301        epoch: u64,
2302        schema_change: PbSinkSchemaChange,
2303    ) -> Result<()> {
2304        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2305        if schema_updated {
2306            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2307            return Ok(());
2308        }
2309
2310        tracing::info!(
2311            "Committing schema change {:?} in epoch {}",
2312            schema_change,
2313            epoch
2314        );
2315        self.commit_schema_change_impl(schema_change).await?;
2316        tracing::info!("Successfully committed schema change in epoch {epoch}");
2317
2318        Ok(())
2319    }
2320
2321    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2322        // TODO: Files that have been written but not committed should be deleted.
2323        tracing::debug!("Abort not implemented yet");
2324    }
2325}
2326
2327/// Methods Required to Achieve Exactly Once Semantics
2328impl IcebergSinkCommitter {
2329    fn pre_commit_inner(
2330        &mut self,
2331        _epoch: u64,
2332        metadata: Vec<SinkMetadata>,
2333    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2334        let write_results: Vec<IcebergCommitResult> = metadata
2335            .iter()
2336            .map(IcebergCommitResult::try_from)
2337            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2338
2339        // Skip if no data to commit
2340        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2341            return Ok(None);
2342        }
2343
2344        let expect_schema_id = write_results[0].schema_id;
2345        let expect_partition_spec_id = write_results[0].partition_spec_id;
2346
2347        // guarantee that all write results has same schema_id and partition_spec_id
2348        if write_results
2349            .iter()
2350            .any(|r| r.schema_id != expect_schema_id)
2351            || write_results
2352                .iter()
2353                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2354        {
2355            return Err(SinkError::Iceberg(anyhow!(
2356                "schema_id and partition_spec_id should be the same in all write results"
2357            )));
2358        }
2359
2360        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2361
2362        Ok(Some((write_results, snapshot_id)))
2363    }
2364
2365    async fn commit_data_impl(
2366        &mut self,
2367        epoch: u64,
2368        write_results: Vec<IcebergCommitResult>,
2369        snapshot_id: i64,
2370    ) -> Result<()> {
2371        // Empty write results should be handled before calling this function.
2372        assert!(
2373            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2374        );
2375
2376        // Check snapshot limit before proceeding with commit
2377        self.wait_for_snapshot_limit().await?;
2378
2379        let expect_schema_id = write_results[0].schema_id;
2380        let expect_partition_spec_id = write_results[0].partition_spec_id;
2381
2382        // Load the latest table to avoid concurrent modification with the best effort.
2383        self.table = Self::reload_table(
2384            self.catalog.as_ref(),
2385            self.table.identifier(),
2386            expect_schema_id,
2387            expect_partition_spec_id,
2388        )
2389        .await?;
2390
2391        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2392            return Err(SinkError::Iceberg(anyhow!(
2393                "Can't find schema by id {}",
2394                expect_schema_id
2395            )));
2396        };
2397        let Some(partition_spec) = self
2398            .table
2399            .metadata()
2400            .partition_spec_by_id(expect_partition_spec_id)
2401        else {
2402            return Err(SinkError::Iceberg(anyhow!(
2403                "Can't find partition spec by id {}",
2404                expect_partition_spec_id
2405            )));
2406        };
2407        let partition_type = partition_spec
2408            .as_ref()
2409            .clone()
2410            .partition_type(schema)
2411            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2412
2413        let data_files = write_results
2414            .into_iter()
2415            .flat_map(|r| {
2416                r.data_files.into_iter().map(|f| {
2417                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2418                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2419                })
2420            })
2421            .collect::<Result<Vec<DataFile>>>()?;
2422        // # TODO:
2423        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2424        // because retry logic involved reapply the commit metadata.
2425        // For now, we just retry the commit operation.
2426        let retry_strategy = ExponentialBackoff::from_millis(10)
2427            .max_delay(Duration::from_secs(60))
2428            .map(jitter)
2429            .take(self.commit_retry_num as usize);
2430        let catalog = self.catalog.clone();
2431        let table_ident = self.table.identifier().clone();
2432
2433        // Custom retry logic that:
2434        // 1. Calls reload_table before each commit attempt to get the latest metadata
2435        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2436        // 3. If commit fails, retries with backoff
2437        enum CommitError {
2438            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2439            Commit(SinkError),      // Retriable: commit conflicts, network errors
2440        }
2441
2442        let table = RetryIf::spawn(
2443            retry_strategy,
2444            || async {
2445                // Reload table before each commit attempt to get the latest metadata
2446                let table = Self::reload_table(
2447                    catalog.as_ref(),
2448                    &table_ident,
2449                    expect_schema_id,
2450                    expect_partition_spec_id,
2451                )
2452                .await
2453                .map_err(|e| {
2454                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2455                    CommitError::ReloadTable(e)
2456                })?;
2457
2458                let txn = Transaction::new(&table);
2459                let append_action = txn
2460                    .fast_append()
2461                    .set_snapshot_id(snapshot_id)
2462                    .set_target_branch(commit_branch(
2463                        self.config.r#type.as_str(),
2464                        self.config.write_mode,
2465                    ))
2466                    .add_data_files(data_files.clone());
2467
2468                let tx = append_action.apply(txn).map_err(|err| {
2469                    let err: IcebergError = err.into();
2470                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2471                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2472                })?;
2473
2474                tx.commit(catalog.as_ref()).await.map_err(|err| {
2475                    let err: IcebergError = err.into();
2476                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2477                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2478                })
2479            },
2480            |err: &CommitError| {
2481                // Only retry on commit errors, not on reload_table errors
2482                match err {
2483                    CommitError::Commit(_) => {
2484                        tracing::warn!("Commit failed, will retry");
2485                        true
2486                    }
2487                    CommitError::ReloadTable(_) => {
2488                        tracing::error!(
2489                            "reload_table failed with non-retriable error, will not retry"
2490                        );
2491                        false
2492                    }
2493                }
2494            },
2495        )
2496        .await
2497        .map_err(|e| match e {
2498            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2499        })?;
2500        self.table = table;
2501
2502        let snapshot_num = self.table.metadata().snapshots().count();
2503        let catalog_name = self.config.common.catalog_name();
2504        let table_name = self.table.identifier().to_string();
2505        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2506        GLOBAL_SINK_METRICS
2507            .iceberg_snapshot_num
2508            .with_guarded_label_values(&metrics_labels)
2509            .set(snapshot_num as i64);
2510
2511        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2512
2513        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2514            && self.config.enable_compaction
2515            && iceberg_compact_stat_sender
2516                .send(IcebergSinkCompactionUpdate {
2517                    sink_id: self.sink_id,
2518                    compaction_interval: self.config.compaction_interval_sec(),
2519                    force_compaction: false,
2520                })
2521                .is_err()
2522        {
2523            warn!("failed to send iceberg compaction stats");
2524        }
2525
2526        Ok(())
2527    }
2528
2529    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2530    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2531    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2532    async fn is_snapshot_id_in_iceberg(
2533        &self,
2534        iceberg_config: &IcebergConfig,
2535        snapshot_id: i64,
2536    ) -> Result<bool> {
2537        let table = iceberg_config.load_table().await?;
2538        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2539            Ok(true)
2540        } else {
2541            Ok(false)
2542        }
2543    }
2544
2545    /// Check if the specified columns already exist in the iceberg table's current schema.
2546    /// This is used to determine if schema change has already been applied.
2547    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2548        let current_schema = self.table.metadata().current_schema();
2549        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2550            .context("Failed to convert schema")
2551            .map_err(SinkError::Iceberg)?;
2552
2553        let iceberg_arrow_convert = IcebergArrowConvert;
2554
2555        let schema_matches = |expected: &[ArrowField]| {
2556            if current_arrow_schema.fields().len() != expected.len() {
2557                return false;
2558            }
2559
2560            expected.iter().all(|expected_field| {
2561                current_arrow_schema.fields().iter().any(|current_field| {
2562                    current_field.name() == expected_field.name()
2563                        && current_field.data_type() == expected_field.data_type()
2564                })
2565            })
2566        };
2567
2568        let original_arrow_fields: Vec<ArrowField> = schema_change
2569            .original_schema
2570            .iter()
2571            .map(|pb_field| {
2572                let field = Field::from(pb_field);
2573                iceberg_arrow_convert
2574                    .to_arrow_field(&field.name, &field.data_type)
2575                    .context("Failed to convert field to arrow")
2576                    .map_err(SinkError::Iceberg)
2577            })
2578            .collect::<Result<_>>()?;
2579
2580        // If current schema equals original_schema, then schema change is NOT applied.
2581        if schema_matches(&original_arrow_fields) {
2582            tracing::debug!(
2583                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2584                original_arrow_fields.len()
2585            );
2586            return Ok(false);
2587        }
2588
2589        // We only support add_columns for now.
2590        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2591            schema_change.op.as_ref()
2592        else {
2593            return Err(SinkError::Iceberg(anyhow!(
2594                "Unsupported sink schema change op in iceberg sink: {:?}",
2595                schema_change.op
2596            )));
2597        };
2598
2599        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2600            .fields
2601            .iter()
2602            .map(|pb_field| {
2603                let field = Field::from(pb_field);
2604                iceberg_arrow_convert
2605                    .to_arrow_field(&field.name, &field.data_type)
2606                    .context("Failed to convert field to arrow")
2607                    .map_err(SinkError::Iceberg)
2608            })
2609            .collect::<Result<_>>()?;
2610
2611        let mut expected_after_change = original_arrow_fields;
2612        expected_after_change.extend(add_arrow_fields);
2613
2614        // If current schema equals original_schema + add_columns, then schema change is applied.
2615        if schema_matches(&expected_after_change) {
2616            tracing::debug!(
2617                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2618                expected_after_change.len()
2619            );
2620            return Ok(true);
2621        }
2622
2623        Err(SinkError::Iceberg(anyhow!(
2624            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2625            schema_change.original_schema.len()
2626        )))
2627    }
2628
2629    /// Commit schema changes (e.g., add columns) to the iceberg table.
2630    /// This function uses Transaction API to atomically update the table schema
2631    /// with optimistic locking to prevent concurrent conflicts.
2632    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2633        use iceberg::spec::NestedField;
2634
2635        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2636            schema_change.op.as_ref()
2637        else {
2638            return Err(SinkError::Iceberg(anyhow!(
2639                "Unsupported sink schema change op in iceberg sink: {:?}",
2640                schema_change.op
2641            )));
2642        };
2643
2644        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2645
2646        // Step 1: Get current table metadata
2647        let metadata = self.table.metadata();
2648        let mut next_field_id = metadata.last_column_id() + 1;
2649        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2650
2651        // Step 2: Build new fields to add
2652        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2653        let mut new_fields = Vec::new();
2654
2655        for field in &add_columns {
2656            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2657            let arrow_field = iceberg_create_table_arrow_convert
2658                .to_arrow_field(&field.name, &field.data_type)
2659                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2660                .map_err(SinkError::Iceberg)?;
2661
2662            // Convert Arrow DataType to Iceberg Type
2663            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2664                .map_err(|err| {
2665                    SinkError::Iceberg(
2666                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2667                    )
2668                })?;
2669
2670            // Create NestedField with the next available field ID
2671            let nested_field = Arc::new(NestedField::optional(
2672                next_field_id,
2673                &field.name,
2674                iceberg_type,
2675            ));
2676
2677            new_fields.push(nested_field);
2678            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2679            next_field_id += 1;
2680        }
2681
2682        // Step 3: Create Transaction with UpdateSchemaAction
2683        tracing::info!(
2684            "Committing schema change to catalog for table {}",
2685            self.table.identifier()
2686        );
2687
2688        let txn = Transaction::new(&self.table);
2689        let action = txn.update_schema().add_fields(new_fields);
2690
2691        let updated_table = action
2692            .apply(txn)
2693            .context("Failed to apply schema update action")
2694            .map_err(SinkError::Iceberg)?
2695            .commit(self.catalog.as_ref())
2696            .await
2697            .context("Failed to commit table schema change")
2698            .map_err(SinkError::Iceberg)?;
2699
2700        self.table = updated_table;
2701
2702        tracing::info!(
2703            "Successfully committed schema change, added {} columns to iceberg table",
2704            add_columns.len()
2705        );
2706
2707        Ok(())
2708    }
2709
2710    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2711    /// Returns the number of snapshots since the last rewrite/overwrite
2712    fn count_snapshots_since_rewrite(&self) -> usize {
2713        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2714        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2715
2716        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2717        let mut count = 0;
2718        for snapshot in snapshots {
2719            // Check if this snapshot represents a rewrite or overwrite operation
2720            let summary = snapshot.summary();
2721            match &summary.operation {
2722                Operation::Replace => {
2723                    // Found a rewrite/overwrite operation, stop counting
2724                    break;
2725                }
2726
2727                _ => {
2728                    // Increment count for each snapshot that is not a rewrite/overwrite
2729                    count += 1;
2730                }
2731            }
2732        }
2733
2734        count
2735    }
2736
2737    /// Wait until snapshot count since last rewrite is below the limit
2738    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2739        if !self.config.enable_compaction {
2740            return Ok(());
2741        }
2742
2743        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2744            loop {
2745                let current_count = self.count_snapshots_since_rewrite();
2746
2747                if current_count < max_snapshots {
2748                    tracing::info!(
2749                        "Snapshot count check passed: {} < {}",
2750                        current_count,
2751                        max_snapshots
2752                    );
2753                    break;
2754                }
2755
2756                tracing::info!(
2757                    "Snapshot count {} exceeds limit {}, waiting...",
2758                    current_count,
2759                    max_snapshots
2760                );
2761
2762                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2763                    && iceberg_compact_stat_sender
2764                        .send(IcebergSinkCompactionUpdate {
2765                            sink_id: self.sink_id,
2766                            compaction_interval: self.config.compaction_interval_sec(),
2767                            force_compaction: true,
2768                        })
2769                        .is_err()
2770                {
2771                    tracing::warn!("failed to send iceberg compaction stats");
2772                }
2773
2774                // Wait for 30 seconds before checking again
2775                tokio::time::sleep(Duration::from_secs(30)).await;
2776
2777                // Reload table to get latest snapshots
2778                self.table = self.config.load_table().await?;
2779            }
2780        }
2781        Ok(())
2782    }
2783}
2784
2785const MAP_KEY: &str = "key";
2786const MAP_VALUE: &str = "value";
2787
2788fn get_fields<'a>(
2789    our_field_type: &'a risingwave_common::types::DataType,
2790    data_type: &ArrowDataType,
2791    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2792) -> Option<ArrowFields> {
2793    match data_type {
2794        ArrowDataType::Struct(fields) => {
2795            match our_field_type {
2796                risingwave_common::types::DataType::Struct(struct_fields) => {
2797                    struct_fields.iter().for_each(|(name, data_type)| {
2798                        let res = schema_fields.insert(name, data_type);
2799                        // This assert is to make sure there is no duplicate field name in the schema.
2800                        assert!(res.is_none())
2801                    });
2802                }
2803                risingwave_common::types::DataType::Map(map_fields) => {
2804                    schema_fields.insert(MAP_KEY, map_fields.key());
2805                    schema_fields.insert(MAP_VALUE, map_fields.value());
2806                }
2807                risingwave_common::types::DataType::List(list) => {
2808                    list.elem()
2809                        .as_struct()
2810                        .iter()
2811                        .for_each(|(name, data_type)| {
2812                            let res = schema_fields.insert(name, data_type);
2813                            // This assert is to make sure there is no duplicate field name in the schema.
2814                            assert!(res.is_none())
2815                        });
2816                }
2817                _ => {}
2818            };
2819            Some(fields.clone())
2820        }
2821        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2822            get_fields(our_field_type, field.data_type(), schema_fields)
2823        }
2824        _ => None, // not a supported complex type and unlikely to show up
2825    }
2826}
2827
2828fn check_compatibility(
2829    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2830    fields: &ArrowFields,
2831) -> anyhow::Result<bool> {
2832    for arrow_field in fields {
2833        let our_field_type = schema_fields
2834            .get(arrow_field.name().as_str())
2835            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2836
2837        // Iceberg source should be able to read iceberg decimal type.
2838        let converted_arrow_data_type = IcebergArrowConvert
2839            .to_arrow_field("", our_field_type)
2840            .map_err(|e| anyhow!(e))?
2841            .data_type()
2842            .clone();
2843
2844        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2845            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2846            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2847            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2848            (ArrowDataType::List(_), ArrowDataType::List(field))
2849            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2850                let mut schema_fields = HashMap::new();
2851                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2852                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2853            }
2854            // validate nested structs
2855            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2856                let mut schema_fields = HashMap::new();
2857                our_field_type
2858                    .as_struct()
2859                    .iter()
2860                    .for_each(|(name, data_type)| {
2861                        let res = schema_fields.insert(name, data_type);
2862                        // This assert is to make sure there is no duplicate field name in the schema.
2863                        assert!(res.is_none())
2864                    });
2865                check_compatibility(schema_fields, fields)?
2866            }
2867            // cases where left != right (metadata, field name mismatch)
2868            //
2869            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2870            // {"PARQUET:field_id": ".."}
2871            //
2872            // map: The standard name in arrow is "entries", "key", "value".
2873            // in iceberg-rs, it's called "key_value"
2874            (left, right) => left.equals_datatype(right),
2875        };
2876        if !compatible {
2877            bail!(
2878                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2879                arrow_field.name(),
2880                converted_arrow_data_type,
2881                arrow_field.data_type()
2882            );
2883        }
2884    }
2885    Ok(true)
2886}
2887
2888/// Try to match our schema with iceberg schema.
2889pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2890    if rw_schema.fields.len() != arrow_schema.fields().len() {
2891        bail!(
2892            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2893            rw_schema.fields.len(),
2894            arrow_schema.fields.len()
2895        );
2896    }
2897
2898    let mut schema_fields = HashMap::new();
2899    rw_schema.fields.iter().for_each(|field| {
2900        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2901        // This assert is to make sure there is no duplicate field name in the schema.
2902        assert!(res.is_none())
2903    });
2904
2905    check_compatibility(schema_fields, &arrow_schema.fields)?;
2906    Ok(())
2907}
2908
2909fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2910    serde_json::to_vec(&metadata).unwrap()
2911}
2912
2913fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2914    serde_json::from_slice(&bytes).unwrap()
2915}
2916
2917pub fn parse_partition_by_exprs(
2918    expr: String,
2919) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2920    // captures column, transform(column), transform(n,column), transform(n, column)
2921    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2922    if !re.is_match(&expr) {
2923        bail!(format!(
2924            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2925            expr
2926        ))
2927    }
2928    let caps = re.captures_iter(&expr);
2929
2930    let mut partition_columns = vec![];
2931
2932    for mat in caps {
2933        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2934            (&mat["transform"], Transform::Identity)
2935        } else {
2936            let mut func = mat["transform"].to_owned();
2937            if func == "bucket" || func == "truncate" {
2938                let n = &mat
2939                    .name("n")
2940                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2941                    .as_str();
2942                func = format!("{func}[{n}]");
2943            }
2944            (
2945                &mat["field"],
2946                Transform::from_str(&func)
2947                    .with_context(|| format!("invalid transform function {}", func))?,
2948            )
2949        };
2950        partition_columns.push((column.to_owned(), transform));
2951    }
2952    Ok(partition_columns)
2953}
2954
2955pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2956    if should_enable_iceberg_cow(sink_type, write_mode) {
2957        ICEBERG_COW_BRANCH.to_owned()
2958    } else {
2959        MAIN_BRANCH.to_owned()
2960    }
2961}
2962
2963pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2964    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2965}
2966
2967impl crate::with_options::WithOptions for IcebergWriteMode {}
2968
2969impl crate::with_options::WithOptions for FormatVersion {}
2970
2971impl crate::with_options::WithOptions for CompactionType {}
2972
2973#[cfg(test)]
2974mod test {
2975    use std::collections::BTreeMap;
2976
2977    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2978    use risingwave_common::types::{DataType, MapType, StructType};
2979
2980    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2981    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2982    use crate::sink::iceberg::{
2983        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2984        ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2985        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2986        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2987    };
2988
2989    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2990
2991    #[test]
2992    fn test_compatible_arrow_schema() {
2993        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2994
2995        use super::*;
2996        let risingwave_schema = Schema::new(vec![
2997            Field::with_name(DataType::Int32, "a"),
2998            Field::with_name(DataType::Int32, "b"),
2999            Field::with_name(DataType::Int32, "c"),
3000        ]);
3001        let arrow_schema = ArrowSchema::new(vec![
3002            ArrowField::new("a", ArrowDataType::Int32, false),
3003            ArrowField::new("b", ArrowDataType::Int32, false),
3004            ArrowField::new("c", ArrowDataType::Int32, false),
3005        ]);
3006
3007        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3008
3009        let risingwave_schema = Schema::new(vec![
3010            Field::with_name(DataType::Int32, "d"),
3011            Field::with_name(DataType::Int32, "c"),
3012            Field::with_name(DataType::Int32, "a"),
3013            Field::with_name(DataType::Int32, "b"),
3014        ]);
3015        let arrow_schema = ArrowSchema::new(vec![
3016            ArrowField::new("a", ArrowDataType::Int32, false),
3017            ArrowField::new("b", ArrowDataType::Int32, false),
3018            ArrowField::new("d", ArrowDataType::Int32, false),
3019            ArrowField::new("c", ArrowDataType::Int32, false),
3020        ]);
3021        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3022
3023        let risingwave_schema = Schema::new(vec![
3024            Field::with_name(
3025                DataType::Struct(StructType::new(vec![
3026                    ("a1", DataType::Int32),
3027                    (
3028                        "a2",
3029                        DataType::Struct(StructType::new(vec![
3030                            ("a21", DataType::Bytea),
3031                            (
3032                                "a22",
3033                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3034                            ),
3035                        ])),
3036                    ),
3037                ])),
3038                "a",
3039            ),
3040            Field::with_name(
3041                DataType::list(DataType::Struct(StructType::new(vec![
3042                    ("b1", DataType::Int32),
3043                    ("b2", DataType::Bytea),
3044                    (
3045                        "b3",
3046                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3047                    ),
3048                ]))),
3049                "b",
3050            ),
3051            Field::with_name(
3052                DataType::Map(MapType::from_kv(
3053                    DataType::Varchar,
3054                    DataType::list(DataType::Struct(StructType::new([
3055                        ("c1", DataType::Int32),
3056                        ("c2", DataType::Bytea),
3057                        (
3058                            "c3",
3059                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3060                        ),
3061                    ]))),
3062                )),
3063                "c",
3064            ),
3065        ]);
3066        let arrow_schema = ArrowSchema::new(vec![
3067            ArrowField::new(
3068                "a",
3069                ArrowDataType::Struct(ArrowFields::from(vec![
3070                    ArrowField::new("a1", ArrowDataType::Int32, false),
3071                    ArrowField::new(
3072                        "a2",
3073                        ArrowDataType::Struct(ArrowFields::from(vec![
3074                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3075                            ArrowField::new_map(
3076                                "a22",
3077                                "entries",
3078                                ArrowFieldRef::new(ArrowField::new(
3079                                    "key",
3080                                    ArrowDataType::Utf8,
3081                                    false,
3082                                )),
3083                                ArrowFieldRef::new(ArrowField::new(
3084                                    "value",
3085                                    ArrowDataType::Utf8,
3086                                    false,
3087                                )),
3088                                false,
3089                                false,
3090                            ),
3091                        ])),
3092                        false,
3093                    ),
3094                ])),
3095                false,
3096            ),
3097            ArrowField::new(
3098                "b",
3099                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3100                    ArrowDataType::Struct(ArrowFields::from(vec![
3101                        ArrowField::new("b1", ArrowDataType::Int32, false),
3102                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3103                        ArrowField::new_map(
3104                            "b3",
3105                            "entries",
3106                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3107                            ArrowFieldRef::new(ArrowField::new(
3108                                "value",
3109                                ArrowDataType::Utf8,
3110                                false,
3111                            )),
3112                            false,
3113                            false,
3114                        ),
3115                    ])),
3116                    false,
3117                ))),
3118                false,
3119            ),
3120            ArrowField::new_map(
3121                "c",
3122                "entries",
3123                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3124                ArrowFieldRef::new(ArrowField::new(
3125                    "value",
3126                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3127                        ArrowDataType::Struct(ArrowFields::from(vec![
3128                            ArrowField::new("c1", ArrowDataType::Int32, false),
3129                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3130                            ArrowField::new_map(
3131                                "c3",
3132                                "entries",
3133                                ArrowFieldRef::new(ArrowField::new(
3134                                    "key",
3135                                    ArrowDataType::Utf8,
3136                                    false,
3137                                )),
3138                                ArrowFieldRef::new(ArrowField::new(
3139                                    "value",
3140                                    ArrowDataType::Utf8,
3141                                    false,
3142                                )),
3143                                false,
3144                                false,
3145                            ),
3146                        ])),
3147                        false,
3148                    ))),
3149                    false,
3150                )),
3151                false,
3152                false,
3153            ),
3154        ]);
3155        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3156    }
3157
3158    #[test]
3159    fn test_parse_iceberg_config() {
3160        let values = [
3161            ("connector", "iceberg"),
3162            ("type", "upsert"),
3163            ("primary_key", "v1"),
3164            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3165            ("warehouse.path", "s3://iceberg"),
3166            ("s3.endpoint", "http://127.0.0.1:9301"),
3167            ("s3.access.key", "hummockadmin"),
3168            ("s3.secret.key", "hummockadmin"),
3169            ("s3.path.style.access", "true"),
3170            ("s3.region", "us-east-1"),
3171            ("catalog.type", "jdbc"),
3172            ("catalog.name", "demo"),
3173            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3174            ("catalog.jdbc.user", "admin"),
3175            ("catalog.jdbc.password", "123456"),
3176            ("database.name", "demo_db"),
3177            ("table.name", "demo_table"),
3178            ("enable_compaction", "true"),
3179            ("compaction_interval_sec", "1800"),
3180            ("enable_snapshot_expiration", "true"),
3181        ]
3182        .into_iter()
3183        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3184        .collect();
3185
3186        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3187
3188        let expected_iceberg_config = IcebergConfig {
3189            common: IcebergCommon {
3190                warehouse_path: Some("s3://iceberg".to_owned()),
3191                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3192                s3_region: Some("us-east-1".to_owned()),
3193                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3194                s3_access_key: Some("hummockadmin".to_owned()),
3195                s3_secret_key: Some("hummockadmin".to_owned()),
3196                s3_iam_role_arn: None,
3197                gcs_credential: None,
3198                catalog_type: Some("jdbc".to_owned()),
3199                glue_id: None,
3200                glue_region: None,
3201                glue_access_key: None,
3202                glue_secret_key: None,
3203                glue_iam_role_arn: None,
3204                catalog_name: Some("demo".to_owned()),
3205                s3_path_style_access: Some(true),
3206                catalog_credential: None,
3207                catalog_oauth2_server_uri: None,
3208                catalog_scope: None,
3209                catalog_token: None,
3210                enable_config_load: None,
3211                rest_signing_name: None,
3212                rest_signing_region: None,
3213                rest_sigv4_enabled: None,
3214                hosted_catalog: None,
3215                azblob_account_name: None,
3216                azblob_account_key: None,
3217                azblob_endpoint_url: None,
3218                catalog_header: None,
3219                adlsgen2_account_name: None,
3220                adlsgen2_account_key: None,
3221                adlsgen2_endpoint: None,
3222                vended_credentials: None,
3223                catalog_security: None,
3224                gcp_auth_scopes: None,
3225                catalog_io_impl: None,
3226            },
3227            table: IcebergTableIdentifier {
3228                database_name: Some("demo_db".to_owned()),
3229                table_name: "demo_table".to_owned(),
3230            },
3231            r#type: "upsert".to_owned(),
3232            force_append_only: false,
3233            primary_key: Some(vec!["v1".to_owned()]),
3234            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3235            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3236                .into_iter()
3237                .map(|(k, v)| (k.to_owned(), v.to_owned()))
3238                .collect(),
3239            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3240            create_table_if_not_exists: false,
3241            is_exactly_once: Some(true),
3242            commit_retry_num: 8,
3243            enable_compaction: true,
3244            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3245            enable_snapshot_expiration: true,
3246            write_mode: IcebergWriteMode::MergeOnRead,
3247            format_version: FormatVersion::V2,
3248            snapshot_expiration_max_age_millis: None,
3249            snapshot_expiration_retain_last: None,
3250            snapshot_expiration_clear_expired_files: true,
3251            snapshot_expiration_clear_expired_meta_data: true,
3252            max_snapshots_num_before_compaction: None,
3253            small_files_threshold_mb: None,
3254            delete_files_count_threshold: None,
3255            trigger_snapshot_count: None,
3256            target_file_size_mb: None,
3257            compaction_type: None,
3258            write_parquet_compression: None,
3259            write_parquet_max_row_group_rows: None,
3260        };
3261
3262        assert_eq!(iceberg_config, expected_iceberg_config);
3263
3264        assert_eq!(
3265            &iceberg_config.full_table_name().unwrap().to_string(),
3266            "demo_db.demo_table"
3267        );
3268    }
3269
3270    async fn test_create_catalog(configs: BTreeMap<String, String>) {
3271        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3272
3273        let _table = iceberg_config.load_table().await.unwrap();
3274    }
3275
3276    #[tokio::test]
3277    #[ignore]
3278    async fn test_storage_catalog() {
3279        let values = [
3280            ("connector", "iceberg"),
3281            ("type", "append-only"),
3282            ("force_append_only", "true"),
3283            ("s3.endpoint", "http://127.0.0.1:9301"),
3284            ("s3.access.key", "hummockadmin"),
3285            ("s3.secret.key", "hummockadmin"),
3286            ("s3.region", "us-east-1"),
3287            ("s3.path.style.access", "true"),
3288            ("catalog.name", "demo"),
3289            ("catalog.type", "storage"),
3290            ("warehouse.path", "s3://icebergdata/demo"),
3291            ("database.name", "s1"),
3292            ("table.name", "t1"),
3293        ]
3294        .into_iter()
3295        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3296        .collect();
3297
3298        test_create_catalog(values).await;
3299    }
3300
3301    #[tokio::test]
3302    #[ignore]
3303    async fn test_rest_catalog() {
3304        let values = [
3305            ("connector", "iceberg"),
3306            ("type", "append-only"),
3307            ("force_append_only", "true"),
3308            ("s3.endpoint", "http://127.0.0.1:9301"),
3309            ("s3.access.key", "hummockadmin"),
3310            ("s3.secret.key", "hummockadmin"),
3311            ("s3.region", "us-east-1"),
3312            ("s3.path.style.access", "true"),
3313            ("catalog.name", "demo"),
3314            ("catalog.type", "rest"),
3315            ("catalog.uri", "http://192.168.167.4:8181"),
3316            ("warehouse.path", "s3://icebergdata/demo"),
3317            ("database.name", "s1"),
3318            ("table.name", "t1"),
3319        ]
3320        .into_iter()
3321        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3322        .collect();
3323
3324        test_create_catalog(values).await;
3325    }
3326
3327    #[tokio::test]
3328    #[ignore]
3329    async fn test_jdbc_catalog() {
3330        let values = [
3331            ("connector", "iceberg"),
3332            ("type", "append-only"),
3333            ("force_append_only", "true"),
3334            ("s3.endpoint", "http://127.0.0.1:9301"),
3335            ("s3.access.key", "hummockadmin"),
3336            ("s3.secret.key", "hummockadmin"),
3337            ("s3.region", "us-east-1"),
3338            ("s3.path.style.access", "true"),
3339            ("catalog.name", "demo"),
3340            ("catalog.type", "jdbc"),
3341            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3342            ("catalog.jdbc.user", "admin"),
3343            ("catalog.jdbc.password", "123456"),
3344            ("warehouse.path", "s3://icebergdata/demo"),
3345            ("database.name", "s1"),
3346            ("table.name", "t1"),
3347        ]
3348        .into_iter()
3349        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3350        .collect();
3351
3352        test_create_catalog(values).await;
3353    }
3354
3355    #[tokio::test]
3356    #[ignore]
3357    async fn test_hive_catalog() {
3358        let values = [
3359            ("connector", "iceberg"),
3360            ("type", "append-only"),
3361            ("force_append_only", "true"),
3362            ("s3.endpoint", "http://127.0.0.1:9301"),
3363            ("s3.access.key", "hummockadmin"),
3364            ("s3.secret.key", "hummockadmin"),
3365            ("s3.region", "us-east-1"),
3366            ("s3.path.style.access", "true"),
3367            ("catalog.name", "demo"),
3368            ("catalog.type", "hive"),
3369            ("catalog.uri", "thrift://localhost:9083"),
3370            ("warehouse.path", "s3://icebergdata/demo"),
3371            ("database.name", "s1"),
3372            ("table.name", "t1"),
3373        ]
3374        .into_iter()
3375        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3376        .collect();
3377
3378        test_create_catalog(values).await;
3379    }
3380
3381    /// Test parsing Google/BigLake authentication configuration.
3382    #[test]
3383    fn test_parse_google_auth_config() {
3384        let values: BTreeMap<String, String> = [
3385            ("connector", "iceberg"),
3386            ("type", "append-only"),
3387            ("force_append_only", "true"),
3388            ("catalog.name", "biglake-catalog"),
3389            ("catalog.type", "rest"),
3390            ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3391            ("warehouse.path", "bq://projects/my-gcp-project"),
3392            ("catalog.header", "x-goog-user-project=my-gcp-project"),
3393            ("catalog.security", "google"),
3394            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3395            ("database.name", "my_dataset"),
3396            ("table.name", "my_table"),
3397        ]
3398        .into_iter()
3399        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3400        .collect();
3401
3402        let config = IcebergConfig::from_btreemap(values).unwrap();
3403        assert_eq!(config.catalog_type(), "rest");
3404        assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3405        assert_eq!(
3406            config.common.gcp_auth_scopes.as_deref(),
3407            Some(
3408                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3409            )
3410        );
3411        assert_eq!(
3412            config.common.warehouse_path.as_deref(),
3413            Some("bq://projects/my-gcp-project")
3414        );
3415        assert_eq!(
3416            config.common.catalog_header.as_deref(),
3417            Some("x-goog-user-project=my-gcp-project")
3418        );
3419    }
3420
3421    /// Test parsing `oauth2` security configuration.
3422    #[test]
3423    fn test_parse_oauth2_security_config() {
3424        let values: BTreeMap<String, String> = [
3425            ("connector", "iceberg"),
3426            ("type", "append-only"),
3427            ("force_append_only", "true"),
3428            ("catalog.name", "oauth2-catalog"),
3429            ("catalog.type", "rest"),
3430            ("catalog.uri", "https://example.com/iceberg/rest"),
3431            ("warehouse.path", "s3://my-bucket/warehouse"),
3432            ("catalog.security", "oauth2"),
3433            ("catalog.credential", "client_id:client_secret"),
3434            ("catalog.token", "bearer-token"),
3435            (
3436                "catalog.oauth2_server_uri",
3437                "https://oauth.example.com/token",
3438            ),
3439            ("catalog.scope", "read write"),
3440            ("database.name", "test_db"),
3441            ("table.name", "test_table"),
3442        ]
3443        .into_iter()
3444        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3445        .collect();
3446
3447        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3448
3449        // Verify catalog type
3450        assert_eq!(iceberg_config.catalog_type(), "rest");
3451
3452        // Verify OAuth2-specific options
3453        assert_eq!(
3454            iceberg_config.common.catalog_security.as_deref(),
3455            Some("oauth2")
3456        );
3457        assert_eq!(
3458            iceberg_config.common.catalog_credential.as_deref(),
3459            Some("client_id:client_secret")
3460        );
3461        assert_eq!(
3462            iceberg_config.common.catalog_token.as_deref(),
3463            Some("bearer-token")
3464        );
3465        assert_eq!(
3466            iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3467            Some("https://oauth.example.com/token")
3468        );
3469        assert_eq!(
3470            iceberg_config.common.catalog_scope.as_deref(),
3471            Some("read write")
3472        );
3473    }
3474
3475    /// Test parsing invalid security configuration.
3476    #[test]
3477    fn test_parse_invalid_security_config() {
3478        let values: BTreeMap<String, String> = [
3479            ("connector", "iceberg"),
3480            ("type", "append-only"),
3481            ("force_append_only", "true"),
3482            ("catalog.name", "invalid-catalog"),
3483            ("catalog.type", "rest"),
3484            ("catalog.uri", "https://example.com/iceberg/rest"),
3485            ("warehouse.path", "s3://my-bucket/warehouse"),
3486            ("catalog.security", "invalid_security_type"),
3487            ("database.name", "test_db"),
3488            ("table.name", "test_table"),
3489        ]
3490        .into_iter()
3491        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3492        .collect();
3493
3494        // This should still parse successfully, but with a warning for unknown security type
3495        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3496
3497        // Verify that the invalid security type is still stored
3498        assert_eq!(
3499            iceberg_config.common.catalog_security.as_deref(),
3500            Some("invalid_security_type")
3501        );
3502
3503        // Verify catalog type
3504        assert_eq!(iceberg_config.catalog_type(), "rest");
3505    }
3506
3507    /// Test parsing custom `FileIO` implementation configuration.
3508    #[test]
3509    fn test_parse_custom_io_impl_config() {
3510        let values: BTreeMap<String, String> = [
3511            ("connector", "iceberg"),
3512            ("type", "append-only"),
3513            ("force_append_only", "true"),
3514            ("catalog.name", "gcs-catalog"),
3515            ("catalog.type", "rest"),
3516            ("catalog.uri", "https://example.com/iceberg/rest"),
3517            ("warehouse.path", "gs://my-bucket/warehouse"),
3518            ("catalog.security", "google"),
3519            ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3520            ("database.name", "test_db"),
3521            ("table.name", "test_table"),
3522        ]
3523        .into_iter()
3524        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3525        .collect();
3526
3527        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3528
3529        // Verify catalog type
3530        assert_eq!(iceberg_config.catalog_type(), "rest");
3531
3532        // Verify custom `FileIO` implementation
3533        assert_eq!(
3534            iceberg_config.common.catalog_io_impl.as_deref(),
3535            Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3536        );
3537
3538        // Verify Google security is set
3539        assert_eq!(
3540            iceberg_config.common.catalog_security.as_deref(),
3541            Some("google")
3542        );
3543    }
3544
3545    #[test]
3546    fn test_config_constants_consistency() {
3547        // This test ensures our constants match the expected configuration names
3548        // If you change a constant, this test will remind you to update both places
3549        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3550        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3551        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3552        assert_eq!(WRITE_MODE, "write_mode");
3553        assert_eq!(
3554            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3555            "snapshot_expiration_retain_last"
3556        );
3557        assert_eq!(
3558            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3559            "snapshot_expiration_max_age_millis"
3560        );
3561        assert_eq!(
3562            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3563            "snapshot_expiration_clear_expired_files"
3564        );
3565        assert_eq!(
3566            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3567            "snapshot_expiration_clear_expired_meta_data"
3568        );
3569        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3570    }
3571
3572    /// Test parsing all compaction.* prefix configs and their default values.
3573    #[test]
3574    fn test_parse_compaction_config() {
3575        // Test with all compaction configs specified
3576        let values: BTreeMap<String, String> = [
3577            ("connector", "iceberg"),
3578            ("type", "upsert"),
3579            ("primary_key", "id"),
3580            ("warehouse.path", "s3://iceberg"),
3581            ("s3.endpoint", "http://127.0.0.1:9301"),
3582            ("s3.access.key", "test"),
3583            ("s3.secret.key", "test"),
3584            ("s3.region", "us-east-1"),
3585            ("catalog.type", "storage"),
3586            ("catalog.name", "demo"),
3587            ("database.name", "test_db"),
3588            ("table.name", "test_table"),
3589            ("enable_compaction", "true"),
3590            ("compaction.max_snapshots_num", "100"),
3591            ("compaction.small_files_threshold_mb", "512"),
3592            ("compaction.delete_files_count_threshold", "50"),
3593            ("compaction.trigger_snapshot_count", "10"),
3594            ("compaction.target_file_size_mb", "256"),
3595            ("compaction.type", "full"),
3596            ("compaction.write_parquet_compression", "zstd"),
3597            ("compaction.write_parquet_max_row_group_rows", "50000"),
3598        ]
3599        .into_iter()
3600        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3601        .collect();
3602
3603        let config = IcebergConfig::from_btreemap(values).unwrap();
3604        assert!(config.enable_compaction);
3605        assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3606        assert_eq!(config.small_files_threshold_mb, Some(512));
3607        assert_eq!(config.delete_files_count_threshold, Some(50));
3608        assert_eq!(config.trigger_snapshot_count, Some(10));
3609        assert_eq!(config.target_file_size_mb, Some(256));
3610        assert_eq!(config.compaction_type, Some(CompactionType::Full));
3611        assert_eq!(config.target_file_size_mb(), 256);
3612        assert_eq!(config.write_parquet_compression(), "zstd");
3613        assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3614
3615        // Test default values (no compaction configs specified)
3616        let values: BTreeMap<String, String> = [
3617            ("connector", "iceberg"),
3618            ("type", "append-only"),
3619            ("force_append_only", "true"),
3620            ("catalog.name", "test-catalog"),
3621            ("catalog.type", "storage"),
3622            ("warehouse.path", "s3://my-bucket/warehouse"),
3623            ("database.name", "test_db"),
3624            ("table.name", "test_table"),
3625        ]
3626        .into_iter()
3627        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3628        .collect();
3629
3630        let config = IcebergConfig::from_btreemap(values).unwrap();
3631        assert_eq!(config.target_file_size_mb(), 1024); // Default
3632        assert_eq!(config.write_parquet_compression(), "zstd"); // Default
3633        assert_eq!(config.write_parquet_max_row_group_rows(), 122880); // Default
3634    }
3635
3636    /// Test parquet compression parsing.
3637    #[test]
3638    fn test_parse_parquet_compression() {
3639        use parquet::basic::Compression;
3640
3641        use super::parse_parquet_compression;
3642
3643        // Test valid compression types
3644        assert!(matches!(
3645            parse_parquet_compression("snappy"),
3646            Compression::SNAPPY
3647        ));
3648        assert!(matches!(
3649            parse_parquet_compression("gzip"),
3650            Compression::GZIP(_)
3651        ));
3652        assert!(matches!(
3653            parse_parquet_compression("zstd"),
3654            Compression::ZSTD(_)
3655        ));
3656        assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3657        assert!(matches!(
3658            parse_parquet_compression("brotli"),
3659            Compression::BROTLI(_)
3660        ));
3661        assert!(matches!(
3662            parse_parquet_compression("uncompressed"),
3663            Compression::UNCOMPRESSED
3664        ));
3665
3666        // Test case insensitivity
3667        assert!(matches!(
3668            parse_parquet_compression("SNAPPY"),
3669            Compression::SNAPPY
3670        ));
3671        assert!(matches!(
3672            parse_parquet_compression("Zstd"),
3673            Compression::ZSTD(_)
3674        ));
3675
3676        // Test invalid compression (should fall back to SNAPPY)
3677        assert!(matches!(
3678            parse_parquet_compression("invalid"),
3679            Compression::SNAPPY
3680        ));
3681    }
3682
3683    #[test]
3684    fn test_append_only_rejects_copy_on_write() {
3685        // Test that append-only sinks reject copy-on-write mode
3686        let values = [
3687            ("connector", "iceberg"),
3688            ("type", "append-only"),
3689            ("warehouse.path", "s3://iceberg"),
3690            ("s3.endpoint", "http://127.0.0.1:9301"),
3691            ("s3.access.key", "test"),
3692            ("s3.secret.key", "test"),
3693            ("s3.region", "us-east-1"),
3694            ("catalog.type", "storage"),
3695            ("catalog.name", "demo"),
3696            ("database.name", "test_db"),
3697            ("table.name", "test_table"),
3698            ("write_mode", "copy-on-write"),
3699        ]
3700        .into_iter()
3701        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3702        .collect();
3703
3704        let result = IcebergConfig::from_btreemap(values);
3705        assert!(result.is_err());
3706        assert!(
3707            result
3708                .unwrap_err()
3709                .to_string()
3710                .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3711        );
3712    }
3713
3714    #[test]
3715    fn test_append_only_accepts_merge_on_read() {
3716        // Test that append-only sinks accept merge-on-read mode (explicit)
3717        let values = [
3718            ("connector", "iceberg"),
3719            ("type", "append-only"),
3720            ("warehouse.path", "s3://iceberg"),
3721            ("s3.endpoint", "http://127.0.0.1:9301"),
3722            ("s3.access.key", "test"),
3723            ("s3.secret.key", "test"),
3724            ("s3.region", "us-east-1"),
3725            ("catalog.type", "storage"),
3726            ("catalog.name", "demo"),
3727            ("database.name", "test_db"),
3728            ("table.name", "test_table"),
3729            ("write_mode", "merge-on-read"),
3730        ]
3731        .into_iter()
3732        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3733        .collect();
3734
3735        let result = IcebergConfig::from_btreemap(values);
3736        assert!(result.is_ok());
3737        let config = result.unwrap();
3738        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3739    }
3740
3741    #[test]
3742    fn test_append_only_defaults_to_merge_on_read() {
3743        // Test that append-only sinks default to merge-on-read when write_mode is not specified
3744        let values = [
3745            ("connector", "iceberg"),
3746            ("type", "append-only"),
3747            ("warehouse.path", "s3://iceberg"),
3748            ("s3.endpoint", "http://127.0.0.1:9301"),
3749            ("s3.access.key", "test"),
3750            ("s3.secret.key", "test"),
3751            ("s3.region", "us-east-1"),
3752            ("catalog.type", "storage"),
3753            ("catalog.name", "demo"),
3754            ("database.name", "test_db"),
3755            ("table.name", "test_table"),
3756        ]
3757        .into_iter()
3758        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3759        .collect();
3760
3761        let result = IcebergConfig::from_btreemap(values);
3762        assert!(result.is_ok());
3763        let config = result.unwrap();
3764        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3765    }
3766
3767    #[test]
3768    fn test_upsert_accepts_copy_on_write() {
3769        // Test that upsert sinks accept copy-on-write mode
3770        let values = [
3771            ("connector", "iceberg"),
3772            ("type", "upsert"),
3773            ("primary_key", "id"),
3774            ("warehouse.path", "s3://iceberg"),
3775            ("s3.endpoint", "http://127.0.0.1:9301"),
3776            ("s3.access.key", "test"),
3777            ("s3.secret.key", "test"),
3778            ("s3.region", "us-east-1"),
3779            ("catalog.type", "storage"),
3780            ("catalog.name", "demo"),
3781            ("database.name", "test_db"),
3782            ("table.name", "test_table"),
3783            ("write_mode", "copy-on-write"),
3784        ]
3785        .into_iter()
3786        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3787        .collect();
3788
3789        let result = IcebergConfig::from_btreemap(values);
3790        assert!(result.is_ok());
3791        let config = result.unwrap();
3792        assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3793    }
3794}