risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31    SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40    POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45    DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::RetryIf;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86    SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95    TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100
101pub const ICEBERG_COW_BRANCH: &str = "ingestion";
102pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
103pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
104pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
105pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
106pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
107
108pub const PARTITION_DATA_ID_START: i32 = 1000;
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
111#[serde(rename_all = "kebab-case")]
112pub enum IcebergWriteMode {
113    #[default]
114    MergeOnRead,
115    CopyOnWrite,
116}
117
118impl IcebergWriteMode {
119    pub fn as_str(self) -> &'static str {
120        match self {
121            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
122            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
123        }
124    }
125}
126
127impl std::str::FromStr for IcebergWriteMode {
128    type Err = SinkError;
129
130    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
131        match s {
132            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
133            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
134            _ => Err(SinkError::Config(anyhow!(format!(
135                "invalid write_mode: {}, must be one of: {}, {}",
136                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
137            )))),
138        }
139    }
140}
141
142impl TryFrom<&str> for IcebergWriteMode {
143    type Error = <Self as std::str::FromStr>::Err;
144
145    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
146        value.parse()
147    }
148}
149
150impl TryFrom<String> for IcebergWriteMode {
151    type Error = <Self as std::str::FromStr>::Err;
152
153    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
154        value.as_str().parse()
155    }
156}
157
158impl std::fmt::Display for IcebergWriteMode {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.write_str(self.as_str())
161    }
162}
163
164// Configuration constants
165pub const ENABLE_COMPACTION: &str = "enable_compaction";
166pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
167pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
168pub const WRITE_MODE: &str = "write_mode";
169pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
170pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
171pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
172pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
173    "snapshot_expiration_clear_expired_meta_data";
174pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
175
176pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
177
178pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
179
180pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
181
182pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
183
184pub const COMPACTION_TYPE: &str = "compaction.type";
185
186fn default_commit_retry_num() -> u32 {
187    8
188}
189
190fn default_iceberg_write_mode() -> IcebergWriteMode {
191    IcebergWriteMode::MergeOnRead
192}
193
194fn default_true() -> bool {
195    true
196}
197
198fn default_some_true() -> Option<bool> {
199    Some(true)
200}
201
202/// Compaction type for Iceberg sink
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
204#[serde(rename_all = "kebab-case")]
205pub enum CompactionType {
206    /// Full compaction - rewrites all data files
207    #[default]
208    Full,
209    /// Small files compaction - only compact small files
210    SmallFiles,
211    /// Files with delete compaction - only compact files that have associated delete files
212    FilesWithDelete,
213}
214
215impl CompactionType {
216    pub fn as_str(&self) -> &'static str {
217        match self {
218            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
219            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
220            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
221        }
222    }
223}
224
225impl std::str::FromStr for CompactionType {
226    type Err = SinkError;
227
228    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
229        match s {
230            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
231            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
232            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
233            _ => Err(SinkError::Config(anyhow!(format!(
234                "invalid compaction_type: {}, must be one of: {}, {}, {}",
235                s,
236                ICEBERG_COMPACTION_TYPE_FULL,
237                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
239            )))),
240        }
241    }
242}
243
244impl TryFrom<&str> for CompactionType {
245    type Error = <Self as std::str::FromStr>::Err;
246
247    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
248        value.parse()
249    }
250}
251
252impl TryFrom<String> for CompactionType {
253    type Error = <Self as std::str::FromStr>::Err;
254
255    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
256        value.as_str().parse()
257    }
258}
259
260impl std::fmt::Display for CompactionType {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        write!(f, "{}", self.as_str())
263    }
264}
265
266#[serde_as]
267#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
268pub struct IcebergConfig {
269    pub r#type: String, // accept "append-only" or "upsert"
270
271    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
272    pub force_append_only: bool,
273
274    #[serde(flatten)]
275    common: IcebergCommon,
276
277    #[serde(flatten)]
278    table: IcebergTableIdentifier,
279
280    #[serde(
281        rename = "primary_key",
282        default,
283        deserialize_with = "deserialize_optional_string_seq_from_string"
284    )]
285    pub primary_key: Option<Vec<String>>,
286
287    // Props for java catalog props.
288    #[serde(skip)]
289    pub java_catalog_props: HashMap<String, String>,
290
291    #[serde(default)]
292    pub partition_by: Option<String>,
293
294    /// Commit every n(>0) checkpoints, default is 60.
295    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
296    #[serde_as(as = "DisplayFromStr")]
297    #[with_option(allow_alter_on_fly)]
298    pub commit_checkpoint_interval: u64,
299
300    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
301    pub create_table_if_not_exists: bool,
302
303    /// Whether it is `exactly_once`, the default is true.
304    #[serde(default = "default_some_true")]
305    #[serde_as(as = "Option<DisplayFromStr>")]
306    pub is_exactly_once: Option<bool>,
307    // Retry commit num when iceberg commit fail. default is 8.
308    // # TODO
309    // Iceberg table may store the retry commit num in table meta.
310    // We should try to find and use that as default commit retry num first.
311    #[serde(default = "default_commit_retry_num")]
312    pub commit_retry_num: u32,
313
314    /// Whether to enable iceberg compaction.
315    #[serde(
316        rename = "enable_compaction",
317        default,
318        deserialize_with = "deserialize_bool_from_string"
319    )]
320    #[with_option(allow_alter_on_fly)]
321    pub enable_compaction: bool,
322
323    /// The interval of iceberg compaction
324    #[serde(rename = "compaction_interval_sec", default)]
325    #[serde_as(as = "Option<DisplayFromStr>")]
326    #[with_option(allow_alter_on_fly)]
327    pub compaction_interval_sec: Option<u64>,
328
329    /// Whether to enable iceberg expired snapshots.
330    #[serde(
331        rename = "enable_snapshot_expiration",
332        default,
333        deserialize_with = "deserialize_bool_from_string"
334    )]
335    #[with_option(allow_alter_on_fly)]
336    pub enable_snapshot_expiration: bool,
337
338    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
339    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
340    pub write_mode: IcebergWriteMode,
341
342    /// The maximum age (in milliseconds) for snapshots before they expire
343    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
344    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
345    #[serde_as(as = "Option<DisplayFromStr>")]
346    #[with_option(allow_alter_on_fly)]
347    pub snapshot_expiration_max_age_millis: Option<i64>,
348
349    /// The number of snapshots to retain
350    #[serde(rename = "snapshot_expiration_retain_last", default)]
351    #[serde_as(as = "Option<DisplayFromStr>")]
352    #[with_option(allow_alter_on_fly)]
353    pub snapshot_expiration_retain_last: Option<i32>,
354
355    #[serde(
356        rename = "snapshot_expiration_clear_expired_files",
357        default = "default_true",
358        deserialize_with = "deserialize_bool_from_string"
359    )]
360    #[with_option(allow_alter_on_fly)]
361    pub snapshot_expiration_clear_expired_files: bool,
362
363    #[serde(
364        rename = "snapshot_expiration_clear_expired_meta_data",
365        default = "default_true",
366        deserialize_with = "deserialize_bool_from_string"
367    )]
368    #[with_option(allow_alter_on_fly)]
369    pub snapshot_expiration_clear_expired_meta_data: bool,
370
371    /// The maximum number of snapshots allowed since the last rewrite operation
372    /// If set, sink will check snapshot count and wait if exceeded
373    #[serde(rename = "compaction.max_snapshots_num", default)]
374    #[serde_as(as = "Option<DisplayFromStr>")]
375    #[with_option(allow_alter_on_fly)]
376    pub max_snapshots_num_before_compaction: Option<usize>,
377
378    #[serde(rename = "compaction.small_files_threshold_mb", default)]
379    #[serde_as(as = "Option<DisplayFromStr>")]
380    #[with_option(allow_alter_on_fly)]
381    pub small_files_threshold_mb: Option<u64>,
382
383    #[serde(rename = "compaction.delete_files_count_threshold", default)]
384    #[serde_as(as = "Option<DisplayFromStr>")]
385    #[with_option(allow_alter_on_fly)]
386    pub delete_files_count_threshold: Option<usize>,
387
388    #[serde(rename = "compaction.trigger_snapshot_count", default)]
389    #[serde_as(as = "Option<DisplayFromStr>")]
390    #[with_option(allow_alter_on_fly)]
391    pub trigger_snapshot_count: Option<usize>,
392
393    #[serde(rename = "compaction.target_file_size_mb", default)]
394    #[serde_as(as = "Option<DisplayFromStr>")]
395    #[with_option(allow_alter_on_fly)]
396    pub target_file_size_mb: Option<u64>,
397
398    /// Compaction type: `full`, `small-files`, or `files-with-delete`
399    /// If not set, will default to `full`
400    #[serde(rename = "compaction.type", default)]
401    #[with_option(allow_alter_on_fly)]
402    pub compaction_type: Option<CompactionType>,
403}
404
405impl EnforceSecret for IcebergConfig {
406    fn enforce_secret<'a>(
407        prop_iter: impl Iterator<Item = &'a str>,
408    ) -> crate::error::ConnectorResult<()> {
409        for prop in prop_iter {
410            IcebergCommon::enforce_one(prop)?;
411        }
412        Ok(())
413    }
414
415    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
416        IcebergCommon::enforce_one(prop)
417    }
418}
419
420impl IcebergConfig {
421    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
422        let mut config =
423            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
424                .map_err(|e| SinkError::Config(anyhow!(e)))?;
425
426        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
427            return Err(SinkError::Config(anyhow!(
428                "`{}` must be {}, or {}",
429                SINK_TYPE_OPTION,
430                SINK_TYPE_APPEND_ONLY,
431                SINK_TYPE_UPSERT
432            )));
433        }
434
435        if config.r#type == SINK_TYPE_UPSERT {
436            if let Some(primary_key) = &config.primary_key {
437                if primary_key.is_empty() {
438                    return Err(SinkError::Config(anyhow!(
439                        "`primary-key` must not be empty in {}",
440                        SINK_TYPE_UPSERT
441                    )));
442                }
443            } else {
444                return Err(SinkError::Config(anyhow!(
445                    "Must set `primary-key` in {}",
446                    SINK_TYPE_UPSERT
447                )));
448            }
449        }
450
451        // All configs start with "catalog." will be treated as java configs.
452        config.java_catalog_props = values
453            .iter()
454            .filter(|(k, _v)| {
455                k.starts_with("catalog.")
456                    && k != &"catalog.uri"
457                    && k != &"catalog.type"
458                    && k != &"catalog.name"
459                    && k != &"catalog.header"
460            })
461            .map(|(k, v)| (k[8..].to_string(), v.clone()))
462            .collect();
463
464        if config.commit_checkpoint_interval == 0 {
465            return Err(SinkError::Config(anyhow!(
466                "`commit-checkpoint-interval` must be greater than 0"
467            )));
468        }
469
470        Ok(config)
471    }
472
473    pub fn catalog_type(&self) -> &str {
474        self.common.catalog_type()
475    }
476
477    pub async fn load_table(&self) -> Result<Table> {
478        self.common
479            .load_table(&self.table, &self.java_catalog_props)
480            .await
481            .map_err(Into::into)
482    }
483
484    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
485        self.common
486            .create_catalog(&self.java_catalog_props)
487            .await
488            .map_err(Into::into)
489    }
490
491    pub fn full_table_name(&self) -> Result<TableIdent> {
492        self.table.to_table_ident().map_err(Into::into)
493    }
494
495    pub fn catalog_name(&self) -> String {
496        self.common.catalog_name()
497    }
498
499    pub fn compaction_interval_sec(&self) -> u64 {
500        // default to 1 hour
501        self.compaction_interval_sec.unwrap_or(3600)
502    }
503
504    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
505    /// Returns `current_time_ms` - `max_age_millis`
506    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
507        self.snapshot_expiration_max_age_millis
508            .map(|max_age_millis| current_time_ms - max_age_millis)
509    }
510
511    pub fn trigger_snapshot_count(&self) -> usize {
512        self.trigger_snapshot_count.unwrap_or(16)
513    }
514
515    pub fn small_files_threshold_mb(&self) -> u64 {
516        self.small_files_threshold_mb.unwrap_or(64)
517    }
518
519    pub fn delete_files_count_threshold(&self) -> usize {
520        self.delete_files_count_threshold.unwrap_or(256)
521    }
522
523    pub fn target_file_size_mb(&self) -> u64 {
524        self.target_file_size_mb.unwrap_or(1024)
525    }
526
527    /// Get the compaction type as an enum
528    /// This method parses the string and returns the enum value
529    pub fn compaction_type(&self) -> CompactionType {
530        self.compaction_type.unwrap_or_default()
531    }
532}
533
534pub struct IcebergSink {
535    pub config: IcebergConfig,
536    param: SinkParam,
537    // In upsert mode, it never be None and empty.
538    unique_column_ids: Option<Vec<usize>>,
539}
540
541impl EnforceSecret for IcebergSink {
542    fn enforce_secret<'a>(
543        prop_iter: impl Iterator<Item = &'a str>,
544    ) -> crate::error::ConnectorResult<()> {
545        for prop in prop_iter {
546            IcebergConfig::enforce_one(prop)?;
547        }
548        Ok(())
549    }
550}
551
552impl TryFrom<SinkParam> for IcebergSink {
553    type Error = SinkError;
554
555    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
556        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
557        IcebergSink::new(config, param)
558    }
559}
560
561impl Debug for IcebergSink {
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("IcebergSink")
564            .field("config", &self.config)
565            .finish()
566    }
567}
568
569async fn create_and_validate_table_impl(
570    config: &IcebergConfig,
571    param: &SinkParam,
572) -> Result<Table> {
573    if config.create_table_if_not_exists {
574        create_table_if_not_exists_impl(config, param).await?;
575    }
576
577    let table = config
578        .load_table()
579        .await
580        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
581
582    let sink_schema = param.schema();
583    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
584        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
585
586    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
587        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
588
589    Ok(table)
590}
591
592async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
593    let catalog = config.create_catalog().await?;
594    let namespace = if let Some(database_name) = config.table.database_name() {
595        let namespace = NamespaceIdent::new(database_name.to_owned());
596        if !catalog
597            .namespace_exists(&namespace)
598            .await
599            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
600        {
601            catalog
602                .create_namespace(&namespace, HashMap::default())
603                .await
604                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
605                .context("failed to create iceberg namespace")?;
606        }
607        namespace
608    } else {
609        bail!("database name must be set if you want to create table")
610    };
611
612    let table_id = config
613        .full_table_name()
614        .context("Unable to parse table name")?;
615    if !catalog
616        .table_exists(&table_id)
617        .await
618        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
619    {
620        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
621        // convert risingwave schema -> arrow schema -> iceberg schema
622        let arrow_fields = param
623            .columns
624            .iter()
625            .map(|column| {
626                Ok(iceberg_create_table_arrow_convert
627                    .to_arrow_field(&column.name, &column.data_type)
628                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
629                    .context(format!(
630                        "failed to convert {}: {} to arrow type",
631                        &column.name, &column.data_type
632                    ))?)
633            })
634            .collect::<Result<Vec<ArrowField>>>()?;
635        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
636        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
637            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
638            .context("failed to convert arrow schema to iceberg schema")?;
639
640        let location = {
641            let mut names = namespace.clone().inner();
642            names.push(config.table.table_name().to_owned());
643            match &config.common.warehouse_path {
644                Some(warehouse_path) => {
645                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
646                    let url = Url::parse(warehouse_path);
647                    if url.is_err() || is_s3_tables {
648                        // For rest catalog, the warehouse_path could be a warehouse name.
649                        // In this case, we should specify the location when creating a table.
650                        if config.common.catalog_type() == "rest"
651                            || config.common.catalog_type() == "rest_rust"
652                        {
653                            None
654                        } else {
655                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
656                        }
657                    } else if warehouse_path.ends_with('/') {
658                        Some(format!("{}{}", warehouse_path, names.join("/")))
659                    } else {
660                        Some(format!("{}/{}", warehouse_path, names.join("/")))
661                    }
662                }
663                None => None,
664            }
665        };
666
667        let partition_spec = match &config.partition_by {
668            Some(partition_by) => {
669                let mut partition_fields = Vec::<UnboundPartitionField>::new();
670                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
671                    .into_iter()
672                    .enumerate()
673                {
674                    match iceberg_schema.field_id_by_name(&column) {
675                        Some(id) => partition_fields.push(
676                            UnboundPartitionField::builder()
677                                .source_id(id)
678                                .transform(transform)
679                                .name(format!("_p_{}", column))
680                                .field_id(PARTITION_DATA_ID_START + i as i32)
681                                .build(),
682                        ),
683                        None => bail!(format!(
684                            "Partition source column does not exist in schema: {}",
685                            column
686                        )),
687                    };
688                }
689                Some(
690                    UnboundPartitionSpec::builder()
691                        .with_spec_id(0)
692                        .add_partition_fields(partition_fields)
693                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
694                        .context("failed to add partition columns")?
695                        .build(),
696                )
697            }
698            None => None,
699        };
700
701        let table_creation_builder = TableCreation::builder()
702            .name(config.table.table_name().to_owned())
703            .schema(iceberg_schema);
704
705        let table_creation = match (location, partition_spec) {
706            (Some(location), Some(partition_spec)) => table_creation_builder
707                .location(location)
708                .partition_spec(partition_spec)
709                .build(),
710            (Some(location), None) => table_creation_builder.location(location).build(),
711            (None, Some(partition_spec)) => table_creation_builder
712                .partition_spec(partition_spec)
713                .build(),
714            (None, None) => table_creation_builder.build(),
715        };
716
717        catalog
718            .create_table(&namespace, table_creation)
719            .await
720            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
721            .context("failed to create iceberg table")?;
722    }
723    Ok(())
724}
725
726impl IcebergSink {
727    pub async fn create_and_validate_table(&self) -> Result<Table> {
728        create_and_validate_table_impl(&self.config, &self.param).await
729    }
730
731    pub async fn create_table_if_not_exists(&self) -> Result<()> {
732        create_table_if_not_exists_impl(&self.config, &self.param).await
733    }
734
735    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
736        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
737            if let Some(pk) = &config.primary_key {
738                let mut unique_column_ids = Vec::with_capacity(pk.len());
739                for col_name in pk {
740                    let id = param
741                        .columns
742                        .iter()
743                        .find(|col| col.name.as_str() == col_name)
744                        .ok_or_else(|| {
745                            SinkError::Config(anyhow!(
746                                "Primary key column {} not found in sink schema",
747                                col_name
748                            ))
749                        })?
750                        .column_id
751                        .get_id() as usize;
752                    unique_column_ids.push(id);
753                }
754                Some(unique_column_ids)
755            } else {
756                unreachable!()
757            }
758        } else {
759            None
760        };
761        Ok(Self {
762            config,
763            param,
764            unique_column_ids,
765        })
766    }
767}
768
769impl Sink for IcebergSink {
770    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
771
772    const SINK_NAME: &'static str = ICEBERG_SINK;
773
774    async fn validate(&self) -> Result<()> {
775        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
776            bail!("Snowflake catalog only supports iceberg sources");
777        }
778
779        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
780            risingwave_common::license::Feature::IcebergSinkWithGlue
781                .check_available()
782                .map_err(|e| anyhow::anyhow!(e))?;
783        }
784
785        // Validate compaction type configuration
786        let compaction_type = self.config.compaction_type();
787
788        // Check COW mode constraints
789        // COW mode only supports 'full' compaction type
790        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
791            && compaction_type != CompactionType::Full
792        {
793            bail!(
794                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
795                compaction_type
796            );
797        }
798
799        match compaction_type {
800            CompactionType::SmallFiles => {
801                // 1. check license
802                risingwave_common::license::Feature::IcebergCompaction
803                    .check_available()
804                    .map_err(|e| anyhow::anyhow!(e))?;
805
806                // 2. check write mode
807                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
808                    bail!(
809                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
810                        self.config.write_mode
811                    );
812                }
813
814                // 3. check conflicting parameters
815                if self.config.delete_files_count_threshold.is_some() {
816                    bail!(
817                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
818                    );
819                }
820            }
821            CompactionType::FilesWithDelete => {
822                // 1. check license
823                risingwave_common::license::Feature::IcebergCompaction
824                    .check_available()
825                    .map_err(|e| anyhow::anyhow!(e))?;
826
827                // 2. check write mode
828                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
829                    bail!(
830                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
831                        self.config.write_mode
832                    );
833                }
834
835                // 3. check conflicting parameters
836                if self.config.small_files_threshold_mb.is_some() {
837                    bail!(
838                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
839                    );
840                }
841            }
842            CompactionType::Full => {
843                // Full compaction has no special requirements
844            }
845        }
846
847        let _ = self.create_and_validate_table().await?;
848        Ok(())
849    }
850
851    fn support_schema_change() -> bool {
852        true
853    }
854
855    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
856        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
857
858        // Validate compaction interval
859        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
860            if iceberg_config.enable_compaction && compaction_interval == 0 {
861                bail!(
862                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
863                );
864            }
865
866            // log compaction_interval
867            tracing::info!(
868                "Alter config compaction_interval set to {} seconds",
869                compaction_interval
870            );
871        }
872
873        // Validate max snapshots
874        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
875            && max_snapshots < 1
876        {
877            bail!(
878                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
879                max_snapshots
880            );
881        }
882
883        Ok(())
884    }
885
886    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
887        let writer = IcebergSinkWriter::new(
888            self.config.clone(),
889            self.param.clone(),
890            writer_param.clone(),
891            self.unique_column_ids.clone(),
892        );
893
894        let commit_checkpoint_interval =
895            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
896                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
897            );
898        let log_sinker = CoordinatedLogSinker::new(
899            &writer_param,
900            self.param.clone(),
901            writer,
902            commit_checkpoint_interval,
903        )
904        .await?;
905
906        Ok(log_sinker)
907    }
908
909    fn is_coordinated_sink(&self) -> bool {
910        true
911    }
912
913    async fn new_coordinator(
914        &self,
915        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
916    ) -> Result<SinkCommitCoordinator> {
917        let catalog = self.config.create_catalog().await?;
918        let table = self.create_and_validate_table().await?;
919        let coordinator = IcebergSinkCommitter {
920            catalog,
921            table,
922            last_commit_epoch: 0,
923            sink_id: self.param.sink_id,
924            config: self.config.clone(),
925            param: self.param.clone(),
926            commit_retry_num: self.config.commit_retry_num,
927            iceberg_compact_stat_sender,
928        };
929        if self.config.is_exactly_once.unwrap_or_default() {
930            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
931        } else {
932            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
933        }
934    }
935}
936
937/// None means no project.
938/// Prepare represent the extra partition column idx.
939/// Done represents the project idx vec.
940///
941/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
942/// to create the project idx vec.
943enum ProjectIdxVec {
944    None,
945    Prepare(usize),
946    Done(Vec<usize>),
947}
948
949type DataFileWriterBuilderType =
950    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
951type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
952    ParquetWriterBuilder,
953    DefaultLocationGenerator,
954    DefaultFileNameGenerator,
955>;
956type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
957    ParquetWriterBuilder,
958    DefaultLocationGenerator,
959    DefaultFileNameGenerator,
960>;
961
962#[derive(Clone)]
963struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
964    inner: B,
965    fanout_enabled: bool,
966    schema: IcebergSchemaRef,
967    partition_spec: PartitionSpecRef,
968    compute_partition: bool,
969}
970
971impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
972    fn new(
973        inner: B,
974        fanout_enabled: bool,
975        schema: IcebergSchemaRef,
976        partition_spec: PartitionSpecRef,
977        compute_partition: bool,
978    ) -> Self {
979        Self {
980            inner,
981            fanout_enabled,
982            schema,
983            partition_spec,
984            compute_partition,
985        }
986    }
987
988    fn build(self) -> iceberg::Result<TaskWriter<B>> {
989        let partition_splitter = match (
990            self.partition_spec.is_unpartitioned(),
991            self.compute_partition,
992        ) {
993            (true, _) => None,
994            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
995                self.schema.clone(),
996                self.partition_spec.clone(),
997            )?),
998            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
999                self.schema.clone(),
1000                self.partition_spec.clone(),
1001            )?),
1002        };
1003
1004        Ok(TaskWriter::new_with_partition_splitter(
1005            self.inner,
1006            self.fanout_enabled,
1007            self.schema,
1008            self.partition_spec,
1009            partition_splitter,
1010        ))
1011    }
1012}
1013
1014pub enum IcebergSinkWriter {
1015    Created(IcebergSinkWriterArgs),
1016    Initialized(IcebergSinkWriterInner),
1017}
1018
1019pub struct IcebergSinkWriterArgs {
1020    config: IcebergConfig,
1021    sink_param: SinkParam,
1022    writer_param: SinkWriterParam,
1023    unique_column_ids: Option<Vec<usize>>,
1024}
1025
1026pub struct IcebergSinkWriterInner {
1027    writer: IcebergWriterDispatch,
1028    arrow_schema: SchemaRef,
1029    // See comments below
1030    metrics: IcebergWriterMetrics,
1031    // State of iceberg table for this writer
1032    table: Table,
1033    // For chunk with extra partition column, we should remove this column before write.
1034    // This project index vec is used to avoid create project idx each time.
1035    project_idx_vec: ProjectIdxVec,
1036}
1037
1038#[allow(clippy::type_complexity)]
1039enum IcebergWriterDispatch {
1040    Append {
1041        writer: Option<Box<dyn IcebergWriter>>,
1042        writer_builder:
1043            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1044    },
1045    Upsert {
1046        writer: Option<Box<dyn IcebergWriter>>,
1047        writer_builder: TaskWriterBuilderWrapper<
1048            MonitoredGeneralWriterBuilder<
1049                DeltaWriterBuilder<
1050                    DataFileWriterBuilderType,
1051                    PositionDeleteFileWriterBuilderType,
1052                    EqualityDeleteFileWriterBuilderType,
1053                >,
1054            >,
1055        >,
1056        arrow_schema_with_op_column: SchemaRef,
1057    },
1058}
1059
1060impl IcebergWriterDispatch {
1061    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1062        match self {
1063            IcebergWriterDispatch::Append { writer, .. }
1064            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1065        }
1066    }
1067}
1068
1069pub struct IcebergWriterMetrics {
1070    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1071    // They are actually used in `PrometheusWriterBuilder`:
1072    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1073    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1074    _write_qps: LabelGuardedIntCounter,
1075    _write_latency: LabelGuardedHistogram,
1076    write_bytes: LabelGuardedIntCounter,
1077}
1078
1079impl IcebergSinkWriter {
1080    pub fn new(
1081        config: IcebergConfig,
1082        sink_param: SinkParam,
1083        writer_param: SinkWriterParam,
1084        unique_column_ids: Option<Vec<usize>>,
1085    ) -> Self {
1086        Self::Created(IcebergSinkWriterArgs {
1087            config,
1088            sink_param,
1089            writer_param,
1090            unique_column_ids,
1091        })
1092    }
1093}
1094
1095impl IcebergSinkWriterInner {
1096    fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1097        let SinkWriterParam {
1098            extra_partition_col_idx,
1099            actor_id,
1100            sink_id,
1101            sink_name,
1102            ..
1103        } = writer_param;
1104        let metrics_labels = [
1105            &actor_id.to_string(),
1106            &sink_id.to_string(),
1107            sink_name.as_str(),
1108        ];
1109
1110        // Metrics
1111        let write_qps = GLOBAL_SINK_METRICS
1112            .iceberg_write_qps
1113            .with_guarded_label_values(&metrics_labels);
1114        let write_latency = GLOBAL_SINK_METRICS
1115            .iceberg_write_latency
1116            .with_guarded_label_values(&metrics_labels);
1117        // # TODO
1118        // Unused. Add this metrics later.
1119        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1120            .iceberg_rolling_unflushed_data_file
1121            .with_guarded_label_values(&metrics_labels);
1122        let write_bytes = GLOBAL_SINK_METRICS
1123            .iceberg_write_bytes
1124            .with_guarded_label_values(&metrics_labels);
1125
1126        let schema = table.metadata().current_schema();
1127        let partition_spec = table.metadata().default_partition_spec();
1128        let fanout_enabled = !partition_spec.fields().is_empty();
1129
1130        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1131        let unique_uuid_suffix = Uuid::now_v7();
1132
1133        let parquet_writer_properties = WriterProperties::builder()
1134            .set_max_row_group_size(
1135                writer_param
1136                    .streaming_config
1137                    .developer
1138                    .iceberg_sink_write_parquet_max_row_group_rows,
1139            )
1140            .build();
1141
1142        let parquet_writer_builder =
1143            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1144        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1145            parquet_writer_builder,
1146            table.file_io().clone(),
1147            DefaultLocationGenerator::new(table.metadata().clone())
1148                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1149            DefaultFileNameGenerator::new(
1150                writer_param.actor_id.to_string(),
1151                Some(unique_uuid_suffix.to_string()),
1152                iceberg::spec::DataFileFormat::Parquet,
1153            ),
1154        );
1155        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1156        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1157            data_file_builder,
1158            write_qps.clone(),
1159            write_latency.clone(),
1160        );
1161        let writer_builder = TaskWriterBuilderWrapper::new(
1162            monitored_builder,
1163            fanout_enabled,
1164            schema.clone(),
1165            partition_spec.clone(),
1166            true,
1167        );
1168        let inner_writer = Some(Box::new(
1169            writer_builder
1170                .clone()
1171                .build()
1172                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1173        ) as Box<dyn IcebergWriter>);
1174        Ok(Self {
1175            arrow_schema: Arc::new(
1176                schema_to_arrow_schema(table.metadata().current_schema())
1177                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1178            ),
1179            metrics: IcebergWriterMetrics {
1180                _write_qps: write_qps,
1181                _write_latency: write_latency,
1182                write_bytes,
1183            },
1184            writer: IcebergWriterDispatch::Append {
1185                writer: inner_writer,
1186                writer_builder,
1187            },
1188            table,
1189            project_idx_vec: {
1190                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1191                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1192                } else {
1193                    ProjectIdxVec::None
1194                }
1195            },
1196        })
1197    }
1198
1199    fn build_upsert(
1200        table: Table,
1201        unique_column_ids: Vec<usize>,
1202        writer_param: &SinkWriterParam,
1203    ) -> Result<Self> {
1204        let SinkWriterParam {
1205            extra_partition_col_idx,
1206            actor_id,
1207            sink_id,
1208            sink_name,
1209            ..
1210        } = writer_param;
1211        let metrics_labels = [
1212            &actor_id.to_string(),
1213            &sink_id.to_string(),
1214            sink_name.as_str(),
1215        ];
1216        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1217
1218        // Metrics
1219        let write_qps = GLOBAL_SINK_METRICS
1220            .iceberg_write_qps
1221            .with_guarded_label_values(&metrics_labels);
1222        let write_latency = GLOBAL_SINK_METRICS
1223            .iceberg_write_latency
1224            .with_guarded_label_values(&metrics_labels);
1225        // # TODO
1226        // Unused. Add this metrics later.
1227        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1228            .iceberg_rolling_unflushed_data_file
1229            .with_guarded_label_values(&metrics_labels);
1230        let write_bytes = GLOBAL_SINK_METRICS
1231            .iceberg_write_bytes
1232            .with_guarded_label_values(&metrics_labels);
1233
1234        // Determine the schema id and partition spec id
1235        let schema = table.metadata().current_schema();
1236        let partition_spec = table.metadata().default_partition_spec();
1237        let fanout_enabled = !partition_spec.fields().is_empty();
1238
1239        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1240        let unique_uuid_suffix = Uuid::now_v7();
1241
1242        let parquet_writer_properties = WriterProperties::builder()
1243            .set_max_row_group_size(
1244                writer_param
1245                    .streaming_config
1246                    .developer
1247                    .iceberg_sink_write_parquet_max_row_group_rows,
1248            )
1249            .build();
1250
1251        let data_file_builder = {
1252            let parquet_writer_builder =
1253                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1254            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1255                parquet_writer_builder,
1256                table.file_io().clone(),
1257                DefaultLocationGenerator::new(table.metadata().clone())
1258                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1259                DefaultFileNameGenerator::new(
1260                    writer_param.actor_id.to_string(),
1261                    Some(unique_uuid_suffix.to_string()),
1262                    iceberg::spec::DataFileFormat::Parquet,
1263                ),
1264            );
1265            DataFileWriterBuilder::new(rolling_writer_builder)
1266        };
1267        let position_delete_builder = {
1268            let parquet_writer_builder = ParquetWriterBuilder::new(
1269                parquet_writer_properties.clone(),
1270                POSITION_DELETE_SCHEMA.clone().into(),
1271            );
1272            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1273                parquet_writer_builder,
1274                table.file_io().clone(),
1275                DefaultLocationGenerator::new(table.metadata().clone())
1276                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1277                DefaultFileNameGenerator::new(
1278                    writer_param.actor_id.to_string(),
1279                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1280                    iceberg::spec::DataFileFormat::Parquet,
1281                ),
1282            );
1283            PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1284        };
1285        let equality_delete_builder = {
1286            let config = EqualityDeleteWriterConfig::new(
1287                unique_column_ids.clone(),
1288                table.metadata().current_schema().clone(),
1289            )
1290            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1291            let parquet_writer_builder = ParquetWriterBuilder::new(
1292                parquet_writer_properties,
1293                Arc::new(
1294                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1295                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296                ),
1297            );
1298            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1299                parquet_writer_builder,
1300                table.file_io().clone(),
1301                DefaultLocationGenerator::new(table.metadata().clone())
1302                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1303                DefaultFileNameGenerator::new(
1304                    writer_param.actor_id.to_string(),
1305                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1306                    iceberg::spec::DataFileFormat::Parquet,
1307                ),
1308            );
1309
1310            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1311        };
1312        let delta_builder = DeltaWriterBuilder::new(
1313            data_file_builder,
1314            position_delete_builder,
1315            equality_delete_builder,
1316            unique_column_ids,
1317            schema.clone(),
1318        );
1319        let original_arrow_schema = Arc::new(
1320            schema_to_arrow_schema(table.metadata().current_schema())
1321                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1322        );
1323        let schema_with_extra_op_column = {
1324            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1325            new_fields.push(Arc::new(ArrowField::new(
1326                "op".to_owned(),
1327                ArrowDataType::Int32,
1328                false,
1329            )));
1330            Arc::new(ArrowSchema::new(new_fields))
1331        };
1332        let writer_builder = TaskWriterBuilderWrapper::new(
1333            MonitoredGeneralWriterBuilder::new(
1334                delta_builder,
1335                write_qps.clone(),
1336                write_latency.clone(),
1337            ),
1338            fanout_enabled,
1339            schema.clone(),
1340            partition_spec.clone(),
1341            true,
1342        );
1343        let inner_writer = Some(Box::new(
1344            writer_builder
1345                .clone()
1346                .build()
1347                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1348        ) as Box<dyn IcebergWriter>);
1349        Ok(Self {
1350            arrow_schema: original_arrow_schema,
1351            metrics: IcebergWriterMetrics {
1352                _write_qps: write_qps,
1353                _write_latency: write_latency,
1354                write_bytes,
1355            },
1356            table,
1357            writer: IcebergWriterDispatch::Upsert {
1358                writer: inner_writer,
1359                writer_builder,
1360                arrow_schema_with_op_column: schema_with_extra_op_column,
1361            },
1362            project_idx_vec: {
1363                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1364                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1365                } else {
1366                    ProjectIdxVec::None
1367                }
1368            },
1369        })
1370    }
1371}
1372
1373#[async_trait]
1374impl SinkWriter for IcebergSinkWriter {
1375    type CommitMetadata = Option<SinkMetadata>;
1376
1377    /// Begin a new epoch
1378    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1379        let Self::Created(args) = self else {
1380            return Ok(());
1381        };
1382
1383        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1384        let inner = match &args.unique_column_ids {
1385            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1386                table,
1387                unique_column_ids.clone(),
1388                &args.writer_param,
1389            )?,
1390            None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1391        };
1392
1393        *self = IcebergSinkWriter::Initialized(inner);
1394        Ok(())
1395    }
1396
1397    /// Write a stream chunk to sink
1398    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1399        let Self::Initialized(inner) = self else {
1400            unreachable!("IcebergSinkWriter should be initialized before barrier");
1401        };
1402
1403        // Try to build writer if it's None.
1404        match &mut inner.writer {
1405            IcebergWriterDispatch::Append {
1406                writer,
1407                writer_builder,
1408            } => {
1409                if writer.is_none() {
1410                    *writer = Some(Box::new(
1411                        writer_builder
1412                            .clone()
1413                            .build()
1414                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1415                    ));
1416                }
1417            }
1418            IcebergWriterDispatch::Upsert {
1419                writer,
1420                writer_builder,
1421                ..
1422            } => {
1423                if writer.is_none() {
1424                    *writer = Some(Box::new(
1425                        writer_builder
1426                            .clone()
1427                            .build()
1428                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1429                    ));
1430                }
1431            }
1432        };
1433
1434        // Process the chunk.
1435        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1436        match &mut inner.project_idx_vec {
1437            ProjectIdxVec::None => {}
1438            ProjectIdxVec::Prepare(idx) => {
1439                if *idx >= chunk.columns().len() {
1440                    return Err(SinkError::Iceberg(anyhow!(
1441                        "invalid extra partition column index {}",
1442                        idx
1443                    )));
1444                }
1445                let project_idx_vec = (0..*idx)
1446                    .chain(*idx + 1..chunk.columns().len())
1447                    .collect_vec();
1448                chunk = chunk.project(&project_idx_vec);
1449                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1450            }
1451            ProjectIdxVec::Done(idx_vec) => {
1452                chunk = chunk.project(idx_vec);
1453            }
1454        }
1455        if ops.is_empty() {
1456            return Ok(());
1457        }
1458        let write_batch_size = chunk.estimated_heap_size();
1459        let batch = match &inner.writer {
1460            IcebergWriterDispatch::Append { .. } => {
1461                // separate out insert chunk
1462                let filters =
1463                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1464                chunk.set_visibility(filters);
1465                IcebergArrowConvert
1466                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1467                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1468            }
1469            IcebergWriterDispatch::Upsert {
1470                arrow_schema_with_op_column,
1471                ..
1472            } => {
1473                let chunk = IcebergArrowConvert
1474                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1475                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1476                let ops = Arc::new(Int32Array::from(
1477                    ops.iter()
1478                        .map(|op| match op {
1479                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1480                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1481                        })
1482                        .collect_vec(),
1483                ));
1484                let mut columns = chunk.columns().to_vec();
1485                columns.push(ops);
1486                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1487                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1488            }
1489        };
1490
1491        let writer = inner.writer.get_writer().unwrap();
1492        writer
1493            .write(batch)
1494            .instrument_await("iceberg_write")
1495            .await
1496            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1497        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1498        Ok(())
1499    }
1500
1501    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1502    /// writer should commit the current epoch.
1503    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1504        let Self::Initialized(inner) = self else {
1505            unreachable!("IcebergSinkWriter should be initialized before barrier");
1506        };
1507
1508        // Skip it if not checkpoint
1509        if !is_checkpoint {
1510            return Ok(None);
1511        }
1512
1513        let close_result = match &mut inner.writer {
1514            IcebergWriterDispatch::Append {
1515                writer,
1516                writer_builder,
1517            } => {
1518                let close_result = match writer.take() {
1519                    Some(mut writer) => {
1520                        Some(writer.close().instrument_await("iceberg_close").await)
1521                    }
1522                    _ => None,
1523                };
1524                match writer_builder.clone().build() {
1525                    Ok(new_writer) => {
1526                        *writer = Some(Box::new(new_writer));
1527                    }
1528                    _ => {
1529                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1530                        // here because current writer may close successfully. So we just log the error.
1531                        warn!("Failed to build new writer after close");
1532                    }
1533                }
1534                close_result
1535            }
1536            IcebergWriterDispatch::Upsert {
1537                writer,
1538                writer_builder,
1539                ..
1540            } => {
1541                let close_result = match writer.take() {
1542                    Some(mut writer) => {
1543                        Some(writer.close().instrument_await("iceberg_close").await)
1544                    }
1545                    _ => None,
1546                };
1547                match writer_builder.clone().build() {
1548                    Ok(new_writer) => {
1549                        *writer = Some(Box::new(new_writer));
1550                    }
1551                    _ => {
1552                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1553                        // here because current writer may close successfully. So we just log the error.
1554                        warn!("Failed to build new writer after close");
1555                    }
1556                }
1557                close_result
1558            }
1559        };
1560
1561        match close_result {
1562            Some(Ok(result)) => {
1563                let format_version = inner.table.metadata().format_version();
1564                let partition_type = inner.table.metadata().default_partition_type();
1565                let data_files = result
1566                    .into_iter()
1567                    .map(|f| {
1568                        // Truncate large column statistics BEFORE serialization
1569                        let truncated = truncate_datafile(f);
1570                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1571                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1572                    })
1573                    .collect::<Result<Vec<_>>>()?;
1574                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1575                    data_files,
1576                    schema_id: inner.table.metadata().current_schema_id(),
1577                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1578                })?))
1579            }
1580            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1581            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1582        }
1583    }
1584}
1585
1586const SCHEMA_ID: &str = "schema_id";
1587const PARTITION_SPEC_ID: &str = "partition_spec_id";
1588const DATA_FILES: &str = "data_files";
1589
1590/// Maximum size for column statistics (min/max values) in bytes.
1591/// Column statistics larger than this will be truncated to avoid metadata bloat.
1592/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1593///
1594/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1595/// We truncate at the `DataFile` level (before serialization) by directly modifying
1596/// the public `lower_bounds` and `upper_bounds` fields.
1597///
1598/// This prevents metadata from ballooning to gigabytes when dealing with large
1599/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1600/// that benefit from query optimization.
1601const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1602
1603/// Truncate large column statistics from `DataFile` BEFORE serialization.
1604///
1605/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1606/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1607///
1608/// # Arguments
1609/// * `data_file` - A `DataFile` to process
1610///
1611/// # Returns
1612/// The modified `DataFile` with large statistics truncated
1613fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1614    // Process lower_bounds - remove entries with large values
1615    data_file.lower_bounds.retain(|field_id, datum| {
1616        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1617        let size = match datum.to_bytes() {
1618            Ok(bytes) => bytes.len(),
1619            Err(_) => 0,
1620        };
1621
1622        if size > MAX_COLUMN_STAT_SIZE {
1623            tracing::debug!(
1624                field_id = field_id,
1625                size = size,
1626                "Truncating large lower_bound statistic"
1627            );
1628            return false;
1629        }
1630        true
1631    });
1632
1633    // Process upper_bounds - remove entries with large values
1634    data_file.upper_bounds.retain(|field_id, datum| {
1635        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1636        let size = match datum.to_bytes() {
1637            Ok(bytes) => bytes.len(),
1638            Err(_) => 0,
1639        };
1640
1641        if size > MAX_COLUMN_STAT_SIZE {
1642            tracing::debug!(
1643                field_id = field_id,
1644                size = size,
1645                "Truncating large upper_bound statistic"
1646            );
1647            return false;
1648        }
1649        true
1650    });
1651
1652    data_file
1653}
1654
1655#[derive(Default, Clone)]
1656struct IcebergCommitResult {
1657    schema_id: i32,
1658    partition_spec_id: i32,
1659    data_files: Vec<SerializedDataFile>,
1660}
1661
1662impl IcebergCommitResult {
1663    fn try_from(value: &SinkMetadata) -> Result<Self> {
1664        if let Some(Serialized(v)) = &value.metadata {
1665            let mut values = if let serde_json::Value::Object(v) =
1666                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1667                    .context("Can't parse iceberg sink metadata")?
1668            {
1669                v
1670            } else {
1671                bail!("iceberg sink metadata should be an object");
1672            };
1673
1674            let schema_id;
1675            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1676                schema_id = value
1677                    .as_u64()
1678                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1679            } else {
1680                bail!("iceberg sink metadata should have schema_id");
1681            }
1682
1683            let partition_spec_id;
1684            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1685                partition_spec_id = value
1686                    .as_u64()
1687                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1688            } else {
1689                bail!("iceberg sink metadata should have partition_spec_id");
1690            }
1691
1692            let data_files: Vec<SerializedDataFile>;
1693            if let serde_json::Value::Array(values) = values
1694                .remove(DATA_FILES)
1695                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1696            {
1697                data_files = values
1698                    .into_iter()
1699                    .map(from_value::<SerializedDataFile>)
1700                    .collect::<std::result::Result<_, _>>()
1701                    .unwrap();
1702            } else {
1703                bail!("iceberg sink metadata should have data_files object");
1704            }
1705
1706            Ok(Self {
1707                schema_id: schema_id as i32,
1708                partition_spec_id: partition_spec_id as i32,
1709                data_files,
1710            })
1711        } else {
1712            bail!("Can't create iceberg sink write result from empty data!")
1713        }
1714    }
1715
1716    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1717        let mut values = if let serde_json::Value::Object(value) =
1718            serde_json::from_slice::<serde_json::Value>(&value)
1719                .context("Can't parse iceberg sink metadata")?
1720        {
1721            value
1722        } else {
1723            bail!("iceberg sink metadata should be an object");
1724        };
1725
1726        let schema_id;
1727        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1728            schema_id = value
1729                .as_u64()
1730                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1731        } else {
1732            bail!("iceberg sink metadata should have schema_id");
1733        }
1734
1735        let partition_spec_id;
1736        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1737            partition_spec_id = value
1738                .as_u64()
1739                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1740        } else {
1741            bail!("iceberg sink metadata should have partition_spec_id");
1742        }
1743
1744        let data_files: Vec<SerializedDataFile>;
1745        if let serde_json::Value::Array(values) = values
1746            .remove(DATA_FILES)
1747            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1748        {
1749            data_files = values
1750                .into_iter()
1751                .map(from_value::<SerializedDataFile>)
1752                .collect::<std::result::Result<_, _>>()
1753                .unwrap();
1754        } else {
1755            bail!("iceberg sink metadata should have data_files object");
1756        }
1757
1758        Ok(Self {
1759            schema_id: schema_id as i32,
1760            partition_spec_id: partition_spec_id as i32,
1761            data_files,
1762        })
1763    }
1764}
1765
1766impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1767    type Error = SinkError;
1768
1769    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1770        let json_data_files = serde_json::Value::Array(
1771            value
1772                .data_files
1773                .iter()
1774                .map(serde_json::to_value)
1775                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1776                .context("Can't serialize data files to json")?,
1777        );
1778        let json_value = serde_json::Value::Object(
1779            vec![
1780                (
1781                    SCHEMA_ID.to_owned(),
1782                    serde_json::Value::Number(value.schema_id.into()),
1783                ),
1784                (
1785                    PARTITION_SPEC_ID.to_owned(),
1786                    serde_json::Value::Number(value.partition_spec_id.into()),
1787                ),
1788                (DATA_FILES.to_owned(), json_data_files),
1789            ]
1790            .into_iter()
1791            .collect(),
1792        );
1793        Ok(SinkMetadata {
1794            metadata: Some(Serialized(SerializedMetadata {
1795                metadata: serde_json::to_vec(&json_value)
1796                    .context("Can't serialize iceberg sink metadata")?,
1797            })),
1798        })
1799    }
1800}
1801
1802impl TryFrom<IcebergCommitResult> for Vec<u8> {
1803    type Error = SinkError;
1804
1805    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1806        let json_data_files = serde_json::Value::Array(
1807            value
1808                .data_files
1809                .iter()
1810                .map(serde_json::to_value)
1811                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1812                .context("Can't serialize data files to json")?,
1813        );
1814        let json_value = serde_json::Value::Object(
1815            vec![
1816                (
1817                    SCHEMA_ID.to_owned(),
1818                    serde_json::Value::Number(value.schema_id.into()),
1819                ),
1820                (
1821                    PARTITION_SPEC_ID.to_owned(),
1822                    serde_json::Value::Number(value.partition_spec_id.into()),
1823                ),
1824                (DATA_FILES.to_owned(), json_data_files),
1825            ]
1826            .into_iter()
1827            .collect(),
1828        );
1829        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1830    }
1831}
1832pub struct IcebergSinkCommitter {
1833    catalog: Arc<dyn Catalog>,
1834    table: Table,
1835    pub last_commit_epoch: u64,
1836    pub(crate) sink_id: SinkId,
1837    pub(crate) config: IcebergConfig,
1838    pub(crate) param: SinkParam,
1839    commit_retry_num: u32,
1840    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1841}
1842
1843impl IcebergSinkCommitter {
1844    // Reload table and guarantee current schema_id and partition_spec_id matches
1845    // given `schema_id` and `partition_spec_id`
1846    async fn reload_table(
1847        catalog: &dyn Catalog,
1848        table_ident: &TableIdent,
1849        schema_id: i32,
1850        partition_spec_id: i32,
1851    ) -> Result<Table> {
1852        let table = catalog
1853            .load_table(table_ident)
1854            .await
1855            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1856        if table.metadata().current_schema_id() != schema_id {
1857            return Err(SinkError::Iceberg(anyhow!(
1858                "Schema evolution not supported, expect schema id {}, but got {}",
1859                schema_id,
1860                table.metadata().current_schema_id()
1861            )));
1862        }
1863        if table.metadata().default_partition_spec_id() != partition_spec_id {
1864            return Err(SinkError::Iceberg(anyhow!(
1865                "Partition evolution not supported, expect partition spec id {}, but got {}",
1866                partition_spec_id,
1867                table.metadata().default_partition_spec_id()
1868            )));
1869        }
1870        Ok(table)
1871    }
1872}
1873
1874#[async_trait]
1875impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1876    async fn init(&mut self) -> Result<()> {
1877        tracing::info!(
1878            "Sink id = {}: iceberg sink coordinator initing.",
1879            self.param.sink_id
1880        );
1881
1882        Ok(())
1883    }
1884
1885    async fn commit(
1886        &mut self,
1887        epoch: u64,
1888        metadata: Vec<SinkMetadata>,
1889        add_columns: Option<Vec<Field>>,
1890    ) -> Result<()> {
1891        tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1892
1893        // Commit data if present
1894        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata, None)? {
1895            self.commit_datafile(epoch, write_results, snapshot_id)
1896                .await?;
1897        }
1898
1899        // Commit schema change if present
1900        if let Some(add_columns) = add_columns {
1901            tracing::info!(?epoch, "Committing schema change");
1902            self.commit_schema_change(add_columns).await?;
1903        }
1904
1905        Ok(())
1906    }
1907}
1908
1909#[async_trait]
1910impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1911    async fn init(&mut self) -> Result<()> {
1912        tracing::info!(
1913            "Sink id = {}: iceberg sink coordinator initing.",
1914            self.param.sink_id
1915        );
1916
1917        Ok(())
1918    }
1919
1920    async fn pre_commit(
1921        &mut self,
1922        epoch: u64,
1923        metadata: Vec<SinkMetadata>,
1924        add_columns: Option<Vec<Field>>,
1925    ) -> Result<Vec<u8>> {
1926        tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1927
1928        // TwoPhaseCommitCoordinator does not support schema change yet
1929        if let Some(add_columns) = &add_columns {
1930            return Err(SinkError::Iceberg(anyhow!(
1931                "TwoPhaseCommitCoordinator for Iceberg sink does not support schema change yet, \
1932                 but got add_columns: {:?}",
1933                add_columns.iter().map(|c| &c.name).collect::<Vec<_>>()
1934            )));
1935        }
1936
1937        let (write_results, snapshot_id) =
1938            match self.pre_commit_inner(epoch, metadata, add_columns)? {
1939                Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1940                None => {
1941                    tracing::debug!(?epoch, "no data to commit");
1942                    return Ok(vec![]);
1943                }
1944            };
1945
1946        let mut write_results_bytes = Vec::new();
1947        for each_parallelism_write_result in write_results {
1948            let each_parallelism_write_result_bytes: Vec<u8> =
1949                each_parallelism_write_result.try_into()?;
1950            write_results_bytes.push(each_parallelism_write_result_bytes);
1951        }
1952
1953        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1954        write_results_bytes.push(snapshot_id_bytes);
1955
1956        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1957        Ok(pre_commit_metadata_bytes)
1958    }
1959
1960    async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1961        tracing::info!("Starting iceberg commit in epoch {epoch}");
1962        if commit_metadata.is_empty() {
1963            tracing::debug!(?epoch, "no data to commit");
1964            return Ok(());
1965        }
1966
1967        let mut write_results_bytes = deserialize_metadata(commit_metadata);
1968
1969        let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1970        let snapshot_id = i64::from_le_bytes(
1971            snapshot_id_bytes
1972                .try_into()
1973                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1974        );
1975
1976        if self
1977            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1978            .await?
1979        {
1980            tracing::info!(
1981                "Snapshot id {} already committed in iceberg table, skip committing again.",
1982                snapshot_id
1983            );
1984            return Ok(());
1985        }
1986
1987        let mut write_results = vec![];
1988        for each in write_results_bytes {
1989            let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1990            write_results.push(write_result);
1991        }
1992
1993        self.commit_datafile(epoch, write_results, snapshot_id)
1994            .await?;
1995
1996        Ok(())
1997    }
1998
1999    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2000        // TODO: Files that have been written but not committed should be deleted.
2001        tracing::debug!("Abort not implemented yet");
2002    }
2003}
2004
2005/// Methods Required to Achieve Exactly Once Semantics
2006impl IcebergSinkCommitter {
2007    fn pre_commit_inner(
2008        &mut self,
2009        _epoch: u64,
2010        metadata: Vec<SinkMetadata>,
2011        add_columns: Option<Vec<Field>>,
2012    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2013        if let Some(add_columns) = add_columns {
2014            return Err(anyhow!(
2015                "Iceberg sink not support add columns, but got: {:?}",
2016                add_columns
2017            )
2018            .into());
2019        }
2020
2021        let write_results: Vec<IcebergCommitResult> = metadata
2022            .iter()
2023            .map(IcebergCommitResult::try_from)
2024            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2025
2026        // Skip if no data to commit
2027        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2028            return Ok(None);
2029        }
2030
2031        let expect_schema_id = write_results[0].schema_id;
2032        let expect_partition_spec_id = write_results[0].partition_spec_id;
2033
2034        // guarantee that all write results has same schema_id and partition_spec_id
2035        if write_results
2036            .iter()
2037            .any(|r| r.schema_id != expect_schema_id)
2038            || write_results
2039                .iter()
2040                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2041        {
2042            return Err(SinkError::Iceberg(anyhow!(
2043                "schema_id and partition_spec_id should be the same in all write results"
2044            )));
2045        }
2046
2047        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2048
2049        Ok(Some((write_results, snapshot_id)))
2050    }
2051
2052    async fn commit_datafile(
2053        &mut self,
2054        epoch: u64,
2055        write_results: Vec<IcebergCommitResult>,
2056        snapshot_id: i64,
2057    ) -> Result<()> {
2058        // Empty write results should be handled before calling this function.
2059        assert!(
2060            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2061        );
2062
2063        // Check snapshot limit before proceeding with commit
2064        self.wait_for_snapshot_limit().await?;
2065
2066        let expect_schema_id = write_results[0].schema_id;
2067        let expect_partition_spec_id = write_results[0].partition_spec_id;
2068
2069        // Load the latest table to avoid concurrent modification with the best effort.
2070        self.table = Self::reload_table(
2071            self.catalog.as_ref(),
2072            self.table.identifier(),
2073            expect_schema_id,
2074            expect_partition_spec_id,
2075        )
2076        .await?;
2077
2078        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2079            return Err(SinkError::Iceberg(anyhow!(
2080                "Can't find schema by id {}",
2081                expect_schema_id
2082            )));
2083        };
2084        let Some(partition_spec) = self
2085            .table
2086            .metadata()
2087            .partition_spec_by_id(expect_partition_spec_id)
2088        else {
2089            return Err(SinkError::Iceberg(anyhow!(
2090                "Can't find partition spec by id {}",
2091                expect_partition_spec_id
2092            )));
2093        };
2094        let partition_type = partition_spec
2095            .as_ref()
2096            .clone()
2097            .partition_type(schema)
2098            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2099
2100        let data_files = write_results
2101            .into_iter()
2102            .flat_map(|r| {
2103                r.data_files.into_iter().map(|f| {
2104                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2105                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2106                })
2107            })
2108            .collect::<Result<Vec<DataFile>>>()?;
2109        // # TODO:
2110        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2111        // because retry logic involved reapply the commit metadata.
2112        // For now, we just retry the commit operation.
2113        let retry_strategy = ExponentialBackoff::from_millis(10)
2114            .max_delay(Duration::from_secs(60))
2115            .map(jitter)
2116            .take(self.commit_retry_num as usize);
2117        let catalog = self.catalog.clone();
2118        let table_ident = self.table.identifier().clone();
2119
2120        // Custom retry logic that:
2121        // 1. Calls reload_table before each commit attempt to get the latest metadata
2122        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2123        // 3. If commit fails, retries with backoff
2124        enum CommitError {
2125            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2126            Commit(SinkError),      // Retriable: commit conflicts, network errors
2127        }
2128
2129        let table = RetryIf::spawn(
2130            retry_strategy,
2131            || async {
2132                // Reload table before each commit attempt to get the latest metadata
2133                let table = Self::reload_table(
2134                    catalog.as_ref(),
2135                    &table_ident,
2136                    expect_schema_id,
2137                    expect_partition_spec_id,
2138                )
2139                .await
2140                .map_err(|e| {
2141                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2142                    CommitError::ReloadTable(e)
2143                })?;
2144
2145                let txn = Transaction::new(&table);
2146                let append_action = txn
2147                    .fast_append()
2148                    .set_snapshot_id(snapshot_id)
2149                    .set_target_branch(commit_branch(
2150                        self.config.r#type.as_str(),
2151                        self.config.write_mode,
2152                    ))
2153                    .add_data_files(data_files.clone());
2154
2155                let tx = append_action.apply(txn).map_err(|err| {
2156                    let err: IcebergError = err.into();
2157                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2158                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2159                })?;
2160
2161                tx.commit(catalog.as_ref()).await.map_err(|err| {
2162                    let err: IcebergError = err.into();
2163                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2164                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2165                })
2166            },
2167            |err: &CommitError| {
2168                // Only retry on commit errors, not on reload_table errors
2169                match err {
2170                    CommitError::Commit(_) => {
2171                        tracing::warn!("Commit failed, will retry");
2172                        true
2173                    }
2174                    CommitError::ReloadTable(_) => {
2175                        tracing::error!(
2176                            "reload_table failed with non-retriable error, will not retry"
2177                        );
2178                        false
2179                    }
2180                }
2181            },
2182        )
2183        .await
2184        .map_err(|e| match e {
2185            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2186        })?;
2187        self.table = table;
2188
2189        let snapshot_num = self.table.metadata().snapshots().count();
2190        let catalog_name = self.config.common.catalog_name();
2191        let table_name = self.table.identifier().to_string();
2192        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2193        GLOBAL_SINK_METRICS
2194            .iceberg_snapshot_num
2195            .with_guarded_label_values(&metrics_labels)
2196            .set(snapshot_num as i64);
2197
2198        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2199
2200        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2201            && self.config.enable_compaction
2202            && iceberg_compact_stat_sender
2203                .send(IcebergSinkCompactionUpdate {
2204                    sink_id: self.sink_id,
2205                    compaction_interval: self.config.compaction_interval_sec(),
2206                    force_compaction: false,
2207                })
2208                .is_err()
2209        {
2210            warn!("failed to send iceberg compaction stats");
2211        }
2212
2213        Ok(())
2214    }
2215
2216    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2217    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2218    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2219    async fn is_snapshot_id_in_iceberg(
2220        &self,
2221        iceberg_config: &IcebergConfig,
2222        snapshot_id: i64,
2223    ) -> Result<bool> {
2224        let table = iceberg_config.load_table().await?;
2225        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2226            Ok(true)
2227        } else {
2228            Ok(false)
2229        }
2230    }
2231
2232    /// Commit schema changes (e.g., add columns) to the iceberg table.
2233    /// This function uses Transaction API to atomically update the table schema
2234    /// with optimistic locking to prevent concurrent conflicts.
2235    async fn commit_schema_change(&mut self, add_columns: Vec<Field>) -> Result<()> {
2236        use iceberg::spec::NestedField;
2237
2238        tracing::info!(
2239            "Starting to commit schema change with {} columns",
2240            add_columns.len()
2241        );
2242
2243        // Step 1: Get current table metadata
2244        let metadata = self.table.metadata();
2245        let mut next_field_id = metadata.last_column_id() + 1;
2246        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2247
2248        // Step 2: Build new fields to add
2249        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2250        let mut new_fields = Vec::new();
2251
2252        for field in &add_columns {
2253            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2254            let arrow_field = iceberg_create_table_arrow_convert
2255                .to_arrow_field(&field.name, &field.data_type)
2256                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2257                .map_err(SinkError::Iceberg)?;
2258
2259            // Convert Arrow DataType to Iceberg Type
2260            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2261                .map_err(|err| {
2262                    SinkError::Iceberg(
2263                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2264                    )
2265                })?;
2266
2267            // Create NestedField with the next available field ID
2268            let nested_field = Arc::new(NestedField::optional(
2269                next_field_id,
2270                &field.name,
2271                iceberg_type,
2272            ));
2273
2274            new_fields.push(nested_field);
2275            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2276            next_field_id += 1;
2277        }
2278
2279        // Step 3: Create Transaction with UpdateSchemaAction
2280        tracing::info!(
2281            "Committing schema change to catalog for table {}",
2282            self.table.identifier()
2283        );
2284
2285        let txn = Transaction::new(&self.table);
2286        let action = txn.update_schema().add_fields(new_fields);
2287
2288        let updated_table = action
2289            .apply(txn)
2290            .context("Failed to apply schema update action")
2291            .map_err(SinkError::Iceberg)?
2292            .commit(self.catalog.as_ref())
2293            .await
2294            .context("Failed to commit table schema change")
2295            .map_err(SinkError::Iceberg)?;
2296
2297        self.table = updated_table;
2298
2299        tracing::info!(
2300            "Successfully committed schema change, added {} columns to iceberg table",
2301            add_columns.len()
2302        );
2303
2304        Ok(())
2305    }
2306
2307    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2308    /// Returns the number of snapshots since the last rewrite/overwrite
2309    fn count_snapshots_since_rewrite(&self) -> usize {
2310        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2311        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2312
2313        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2314        let mut count = 0;
2315        for snapshot in snapshots {
2316            // Check if this snapshot represents a rewrite or overwrite operation
2317            let summary = snapshot.summary();
2318            match &summary.operation {
2319                Operation::Replace => {
2320                    // Found a rewrite/overwrite operation, stop counting
2321                    break;
2322                }
2323
2324                _ => {
2325                    // Increment count for each snapshot that is not a rewrite/overwrite
2326                    count += 1;
2327                }
2328            }
2329        }
2330
2331        count
2332    }
2333
2334    /// Wait until snapshot count since last rewrite is below the limit
2335    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2336        if !self.config.enable_compaction {
2337            return Ok(());
2338        }
2339
2340        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2341            loop {
2342                let current_count = self.count_snapshots_since_rewrite();
2343
2344                if current_count < max_snapshots {
2345                    tracing::info!(
2346                        "Snapshot count check passed: {} < {}",
2347                        current_count,
2348                        max_snapshots
2349                    );
2350                    break;
2351                }
2352
2353                tracing::info!(
2354                    "Snapshot count {} exceeds limit {}, waiting...",
2355                    current_count,
2356                    max_snapshots
2357                );
2358
2359                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2360                    && iceberg_compact_stat_sender
2361                        .send(IcebergSinkCompactionUpdate {
2362                            sink_id: self.sink_id,
2363                            compaction_interval: self.config.compaction_interval_sec(),
2364                            force_compaction: true,
2365                        })
2366                        .is_err()
2367                {
2368                    tracing::warn!("failed to send iceberg compaction stats");
2369                }
2370
2371                // Wait for 30 seconds before checking again
2372                tokio::time::sleep(Duration::from_secs(30)).await;
2373
2374                // Reload table to get latest snapshots
2375                self.table = self.config.load_table().await?;
2376            }
2377        }
2378        Ok(())
2379    }
2380}
2381
2382const MAP_KEY: &str = "key";
2383const MAP_VALUE: &str = "value";
2384
2385fn get_fields<'a>(
2386    our_field_type: &'a risingwave_common::types::DataType,
2387    data_type: &ArrowDataType,
2388    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2389) -> Option<ArrowFields> {
2390    match data_type {
2391        ArrowDataType::Struct(fields) => {
2392            match our_field_type {
2393                risingwave_common::types::DataType::Struct(struct_fields) => {
2394                    struct_fields.iter().for_each(|(name, data_type)| {
2395                        let res = schema_fields.insert(name, data_type);
2396                        // This assert is to make sure there is no duplicate field name in the schema.
2397                        assert!(res.is_none())
2398                    });
2399                }
2400                risingwave_common::types::DataType::Map(map_fields) => {
2401                    schema_fields.insert(MAP_KEY, map_fields.key());
2402                    schema_fields.insert(MAP_VALUE, map_fields.value());
2403                }
2404                risingwave_common::types::DataType::List(list) => {
2405                    list.elem()
2406                        .as_struct()
2407                        .iter()
2408                        .for_each(|(name, data_type)| {
2409                            let res = schema_fields.insert(name, data_type);
2410                            // This assert is to make sure there is no duplicate field name in the schema.
2411                            assert!(res.is_none())
2412                        });
2413                }
2414                _ => {}
2415            };
2416            Some(fields.clone())
2417        }
2418        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2419            get_fields(our_field_type, field.data_type(), schema_fields)
2420        }
2421        _ => None, // not a supported complex type and unlikely to show up
2422    }
2423}
2424
2425fn check_compatibility(
2426    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2427    fields: &ArrowFields,
2428) -> anyhow::Result<bool> {
2429    for arrow_field in fields {
2430        let our_field_type = schema_fields
2431            .get(arrow_field.name().as_str())
2432            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2433
2434        // Iceberg source should be able to read iceberg decimal type.
2435        let converted_arrow_data_type = IcebergArrowConvert
2436            .to_arrow_field("", our_field_type)
2437            .map_err(|e| anyhow!(e))?
2438            .data_type()
2439            .clone();
2440
2441        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2442            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2443            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2444            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2445            (ArrowDataType::List(_), ArrowDataType::List(field))
2446            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2447                let mut schema_fields = HashMap::new();
2448                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2449                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2450            }
2451            // validate nested structs
2452            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2453                let mut schema_fields = HashMap::new();
2454                our_field_type
2455                    .as_struct()
2456                    .iter()
2457                    .for_each(|(name, data_type)| {
2458                        let res = schema_fields.insert(name, data_type);
2459                        // This assert is to make sure there is no duplicate field name in the schema.
2460                        assert!(res.is_none())
2461                    });
2462                check_compatibility(schema_fields, fields)?
2463            }
2464            // cases where left != right (metadata, field name mismatch)
2465            //
2466            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2467            // {"PARQUET:field_id": ".."}
2468            //
2469            // map: The standard name in arrow is "entries", "key", "value".
2470            // in iceberg-rs, it's called "key_value"
2471            (left, right) => left.equals_datatype(right),
2472        };
2473        if !compatible {
2474            bail!(
2475                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2476                arrow_field.name(),
2477                converted_arrow_data_type,
2478                arrow_field.data_type()
2479            );
2480        }
2481    }
2482    Ok(true)
2483}
2484
2485/// Try to match our schema with iceberg schema.
2486pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2487    if rw_schema.fields.len() != arrow_schema.fields().len() {
2488        bail!(
2489            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2490            rw_schema.fields.len(),
2491            arrow_schema.fields.len()
2492        );
2493    }
2494
2495    let mut schema_fields = HashMap::new();
2496    rw_schema.fields.iter().for_each(|field| {
2497        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2498        // This assert is to make sure there is no duplicate field name in the schema.
2499        assert!(res.is_none())
2500    });
2501
2502    check_compatibility(schema_fields, &arrow_schema.fields)?;
2503    Ok(())
2504}
2505
2506fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2507    serde_json::to_vec(&metadata).unwrap()
2508}
2509
2510fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2511    serde_json::from_slice(&bytes).unwrap()
2512}
2513
2514pub fn parse_partition_by_exprs(
2515    expr: String,
2516) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2517    // captures column, transform(column), transform(n,column), transform(n, column)
2518    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2519    if !re.is_match(&expr) {
2520        bail!(format!(
2521            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2522            expr
2523        ))
2524    }
2525    let caps = re.captures_iter(&expr);
2526
2527    let mut partition_columns = vec![];
2528
2529    for mat in caps {
2530        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2531            (&mat["transform"], Transform::Identity)
2532        } else {
2533            let mut func = mat["transform"].to_owned();
2534            if func == "bucket" || func == "truncate" {
2535                let n = &mat
2536                    .name("n")
2537                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2538                    .as_str();
2539                func = format!("{func}[{n}]");
2540            }
2541            (
2542                &mat["field"],
2543                Transform::from_str(&func)
2544                    .with_context(|| format!("invalid transform function {}", func))?,
2545            )
2546        };
2547        partition_columns.push((column.to_owned(), transform));
2548    }
2549    Ok(partition_columns)
2550}
2551
2552pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2553    if should_enable_iceberg_cow(sink_type, write_mode) {
2554        ICEBERG_COW_BRANCH.to_owned()
2555    } else {
2556        MAIN_BRANCH.to_owned()
2557    }
2558}
2559
2560pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2561    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2562}
2563
2564impl crate::with_options::WithOptions for IcebergWriteMode {}
2565
2566impl crate::with_options::WithOptions for CompactionType {}
2567
2568#[cfg(test)]
2569mod test {
2570    use std::collections::BTreeMap;
2571
2572    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2573    use risingwave_common::types::{DataType, MapType, StructType};
2574
2575    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2576    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2577    use crate::sink::iceberg::{
2578        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2579        ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2580        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2581        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2582    };
2583
2584    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2585
2586    #[test]
2587    fn test_compatible_arrow_schema() {
2588        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2589
2590        use super::*;
2591        let risingwave_schema = Schema::new(vec![
2592            Field::with_name(DataType::Int32, "a"),
2593            Field::with_name(DataType::Int32, "b"),
2594            Field::with_name(DataType::Int32, "c"),
2595        ]);
2596        let arrow_schema = ArrowSchema::new(vec![
2597            ArrowField::new("a", ArrowDataType::Int32, false),
2598            ArrowField::new("b", ArrowDataType::Int32, false),
2599            ArrowField::new("c", ArrowDataType::Int32, false),
2600        ]);
2601
2602        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2603
2604        let risingwave_schema = Schema::new(vec![
2605            Field::with_name(DataType::Int32, "d"),
2606            Field::with_name(DataType::Int32, "c"),
2607            Field::with_name(DataType::Int32, "a"),
2608            Field::with_name(DataType::Int32, "b"),
2609        ]);
2610        let arrow_schema = ArrowSchema::new(vec![
2611            ArrowField::new("a", ArrowDataType::Int32, false),
2612            ArrowField::new("b", ArrowDataType::Int32, false),
2613            ArrowField::new("d", ArrowDataType::Int32, false),
2614            ArrowField::new("c", ArrowDataType::Int32, false),
2615        ]);
2616        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2617
2618        let risingwave_schema = Schema::new(vec![
2619            Field::with_name(
2620                DataType::Struct(StructType::new(vec![
2621                    ("a1", DataType::Int32),
2622                    (
2623                        "a2",
2624                        DataType::Struct(StructType::new(vec![
2625                            ("a21", DataType::Bytea),
2626                            (
2627                                "a22",
2628                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2629                            ),
2630                        ])),
2631                    ),
2632                ])),
2633                "a",
2634            ),
2635            Field::with_name(
2636                DataType::list(DataType::Struct(StructType::new(vec![
2637                    ("b1", DataType::Int32),
2638                    ("b2", DataType::Bytea),
2639                    (
2640                        "b3",
2641                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2642                    ),
2643                ]))),
2644                "b",
2645            ),
2646            Field::with_name(
2647                DataType::Map(MapType::from_kv(
2648                    DataType::Varchar,
2649                    DataType::list(DataType::Struct(StructType::new([
2650                        ("c1", DataType::Int32),
2651                        ("c2", DataType::Bytea),
2652                        (
2653                            "c3",
2654                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2655                        ),
2656                    ]))),
2657                )),
2658                "c",
2659            ),
2660        ]);
2661        let arrow_schema = ArrowSchema::new(vec![
2662            ArrowField::new(
2663                "a",
2664                ArrowDataType::Struct(ArrowFields::from(vec![
2665                    ArrowField::new("a1", ArrowDataType::Int32, false),
2666                    ArrowField::new(
2667                        "a2",
2668                        ArrowDataType::Struct(ArrowFields::from(vec![
2669                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2670                            ArrowField::new_map(
2671                                "a22",
2672                                "entries",
2673                                ArrowFieldRef::new(ArrowField::new(
2674                                    "key",
2675                                    ArrowDataType::Utf8,
2676                                    false,
2677                                )),
2678                                ArrowFieldRef::new(ArrowField::new(
2679                                    "value",
2680                                    ArrowDataType::Utf8,
2681                                    false,
2682                                )),
2683                                false,
2684                                false,
2685                            ),
2686                        ])),
2687                        false,
2688                    ),
2689                ])),
2690                false,
2691            ),
2692            ArrowField::new(
2693                "b",
2694                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2695                    ArrowDataType::Struct(ArrowFields::from(vec![
2696                        ArrowField::new("b1", ArrowDataType::Int32, false),
2697                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2698                        ArrowField::new_map(
2699                            "b3",
2700                            "entries",
2701                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2702                            ArrowFieldRef::new(ArrowField::new(
2703                                "value",
2704                                ArrowDataType::Utf8,
2705                                false,
2706                            )),
2707                            false,
2708                            false,
2709                        ),
2710                    ])),
2711                    false,
2712                ))),
2713                false,
2714            ),
2715            ArrowField::new_map(
2716                "c",
2717                "entries",
2718                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2719                ArrowFieldRef::new(ArrowField::new(
2720                    "value",
2721                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2722                        ArrowDataType::Struct(ArrowFields::from(vec![
2723                            ArrowField::new("c1", ArrowDataType::Int32, false),
2724                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2725                            ArrowField::new_map(
2726                                "c3",
2727                                "entries",
2728                                ArrowFieldRef::new(ArrowField::new(
2729                                    "key",
2730                                    ArrowDataType::Utf8,
2731                                    false,
2732                                )),
2733                                ArrowFieldRef::new(ArrowField::new(
2734                                    "value",
2735                                    ArrowDataType::Utf8,
2736                                    false,
2737                                )),
2738                                false,
2739                                false,
2740                            ),
2741                        ])),
2742                        false,
2743                    ))),
2744                    false,
2745                )),
2746                false,
2747                false,
2748            ),
2749        ]);
2750        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2751    }
2752
2753    #[test]
2754    fn test_parse_iceberg_config() {
2755        let values = [
2756            ("connector", "iceberg"),
2757            ("type", "upsert"),
2758            ("primary_key", "v1"),
2759            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2760            ("warehouse.path", "s3://iceberg"),
2761            ("s3.endpoint", "http://127.0.0.1:9301"),
2762            ("s3.access.key", "hummockadmin"),
2763            ("s3.secret.key", "hummockadmin"),
2764            ("s3.path.style.access", "true"),
2765            ("s3.region", "us-east-1"),
2766            ("catalog.type", "jdbc"),
2767            ("catalog.name", "demo"),
2768            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2769            ("catalog.jdbc.user", "admin"),
2770            ("catalog.jdbc.password", "123456"),
2771            ("database.name", "demo_db"),
2772            ("table.name", "demo_table"),
2773            ("enable_compaction", "true"),
2774            ("compaction_interval_sec", "1800"),
2775            ("enable_snapshot_expiration", "true"),
2776        ]
2777        .into_iter()
2778        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2779        .collect();
2780
2781        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2782
2783        let expected_iceberg_config = IcebergConfig {
2784            common: IcebergCommon {
2785                warehouse_path: Some("s3://iceberg".to_owned()),
2786                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2787                s3_region: Some("us-east-1".to_owned()),
2788                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2789                s3_access_key: Some("hummockadmin".to_owned()),
2790                s3_secret_key: Some("hummockadmin".to_owned()),
2791                s3_iam_role_arn: None,
2792                gcs_credential: None,
2793                catalog_type: Some("jdbc".to_owned()),
2794                glue_id: None,
2795                glue_region: None,
2796                glue_access_key: None,
2797                glue_secret_key: None,
2798                glue_iam_role_arn: None,
2799                catalog_name: Some("demo".to_owned()),
2800                s3_path_style_access: Some(true),
2801                catalog_credential: None,
2802                catalog_oauth2_server_uri: None,
2803                catalog_scope: None,
2804                catalog_token: None,
2805                enable_config_load: None,
2806                rest_signing_name: None,
2807                rest_signing_region: None,
2808                rest_sigv4_enabled: None,
2809                hosted_catalog: None,
2810                azblob_account_name: None,
2811                azblob_account_key: None,
2812                azblob_endpoint_url: None,
2813                catalog_header: None,
2814                adlsgen2_account_name: None,
2815                adlsgen2_account_key: None,
2816                adlsgen2_endpoint: None,
2817                vended_credentials: None,
2818            },
2819            table: IcebergTableIdentifier {
2820                database_name: Some("demo_db".to_owned()),
2821                table_name: "demo_table".to_owned(),
2822            },
2823            r#type: "upsert".to_owned(),
2824            force_append_only: false,
2825            primary_key: Some(vec!["v1".to_owned()]),
2826            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2827            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2828                .into_iter()
2829                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2830                .collect(),
2831            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2832            create_table_if_not_exists: false,
2833            is_exactly_once: Some(true),
2834            commit_retry_num: 8,
2835            enable_compaction: true,
2836            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2837            enable_snapshot_expiration: true,
2838            write_mode: IcebergWriteMode::MergeOnRead,
2839            snapshot_expiration_max_age_millis: None,
2840            snapshot_expiration_retain_last: None,
2841            snapshot_expiration_clear_expired_files: true,
2842            snapshot_expiration_clear_expired_meta_data: true,
2843            max_snapshots_num_before_compaction: None,
2844            small_files_threshold_mb: None,
2845            delete_files_count_threshold: None,
2846            trigger_snapshot_count: None,
2847            target_file_size_mb: None,
2848            compaction_type: None,
2849        };
2850
2851        assert_eq!(iceberg_config, expected_iceberg_config);
2852
2853        assert_eq!(
2854            &iceberg_config.full_table_name().unwrap().to_string(),
2855            "demo_db.demo_table"
2856        );
2857    }
2858
2859    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2860        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2861
2862        let _table = iceberg_config.load_table().await.unwrap();
2863    }
2864
2865    #[tokio::test]
2866    #[ignore]
2867    async fn test_storage_catalog() {
2868        let values = [
2869            ("connector", "iceberg"),
2870            ("type", "append-only"),
2871            ("force_append_only", "true"),
2872            ("s3.endpoint", "http://127.0.0.1:9301"),
2873            ("s3.access.key", "hummockadmin"),
2874            ("s3.secret.key", "hummockadmin"),
2875            ("s3.region", "us-east-1"),
2876            ("s3.path.style.access", "true"),
2877            ("catalog.name", "demo"),
2878            ("catalog.type", "storage"),
2879            ("warehouse.path", "s3://icebergdata/demo"),
2880            ("database.name", "s1"),
2881            ("table.name", "t1"),
2882        ]
2883        .into_iter()
2884        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2885        .collect();
2886
2887        test_create_catalog(values).await;
2888    }
2889
2890    #[tokio::test]
2891    #[ignore]
2892    async fn test_rest_catalog() {
2893        let values = [
2894            ("connector", "iceberg"),
2895            ("type", "append-only"),
2896            ("force_append_only", "true"),
2897            ("s3.endpoint", "http://127.0.0.1:9301"),
2898            ("s3.access.key", "hummockadmin"),
2899            ("s3.secret.key", "hummockadmin"),
2900            ("s3.region", "us-east-1"),
2901            ("s3.path.style.access", "true"),
2902            ("catalog.name", "demo"),
2903            ("catalog.type", "rest"),
2904            ("catalog.uri", "http://192.168.167.4:8181"),
2905            ("warehouse.path", "s3://icebergdata/demo"),
2906            ("database.name", "s1"),
2907            ("table.name", "t1"),
2908        ]
2909        .into_iter()
2910        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2911        .collect();
2912
2913        test_create_catalog(values).await;
2914    }
2915
2916    #[tokio::test]
2917    #[ignore]
2918    async fn test_jdbc_catalog() {
2919        let values = [
2920            ("connector", "iceberg"),
2921            ("type", "append-only"),
2922            ("force_append_only", "true"),
2923            ("s3.endpoint", "http://127.0.0.1:9301"),
2924            ("s3.access.key", "hummockadmin"),
2925            ("s3.secret.key", "hummockadmin"),
2926            ("s3.region", "us-east-1"),
2927            ("s3.path.style.access", "true"),
2928            ("catalog.name", "demo"),
2929            ("catalog.type", "jdbc"),
2930            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2931            ("catalog.jdbc.user", "admin"),
2932            ("catalog.jdbc.password", "123456"),
2933            ("warehouse.path", "s3://icebergdata/demo"),
2934            ("database.name", "s1"),
2935            ("table.name", "t1"),
2936        ]
2937        .into_iter()
2938        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2939        .collect();
2940
2941        test_create_catalog(values).await;
2942    }
2943
2944    #[tokio::test]
2945    #[ignore]
2946    async fn test_hive_catalog() {
2947        let values = [
2948            ("connector", "iceberg"),
2949            ("type", "append-only"),
2950            ("force_append_only", "true"),
2951            ("s3.endpoint", "http://127.0.0.1:9301"),
2952            ("s3.access.key", "hummockadmin"),
2953            ("s3.secret.key", "hummockadmin"),
2954            ("s3.region", "us-east-1"),
2955            ("s3.path.style.access", "true"),
2956            ("catalog.name", "demo"),
2957            ("catalog.type", "hive"),
2958            ("catalog.uri", "thrift://localhost:9083"),
2959            ("warehouse.path", "s3://icebergdata/demo"),
2960            ("database.name", "s1"),
2961            ("table.name", "t1"),
2962        ]
2963        .into_iter()
2964        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2965        .collect();
2966
2967        test_create_catalog(values).await;
2968    }
2969
2970    #[test]
2971    fn test_config_constants_consistency() {
2972        // This test ensures our constants match the expected configuration names
2973        // If you change a constant, this test will remind you to update both places
2974        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2975        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2976        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2977        assert_eq!(WRITE_MODE, "write_mode");
2978        assert_eq!(
2979            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2980            "snapshot_expiration_retain_last"
2981        );
2982        assert_eq!(
2983            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2984            "snapshot_expiration_max_age_millis"
2985        );
2986        assert_eq!(
2987            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2988            "snapshot_expiration_clear_expired_files"
2989        );
2990        assert_eq!(
2991            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2992            "snapshot_expiration_clear_expired_meta_data"
2993        );
2994        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2995    }
2996
2997    #[test]
2998    fn test_parse_iceberg_compaction_config() {
2999        // Test parsing with new compaction.* prefix config names
3000        let values = [
3001            ("connector", "iceberg"),
3002            ("type", "upsert"),
3003            ("primary_key", "id"),
3004            ("warehouse.path", "s3://iceberg"),
3005            ("s3.endpoint", "http://127.0.0.1:9301"),
3006            ("s3.access.key", "test"),
3007            ("s3.secret.key", "test"),
3008            ("s3.region", "us-east-1"),
3009            ("catalog.type", "storage"),
3010            ("catalog.name", "demo"),
3011            ("database.name", "test_db"),
3012            ("table.name", "test_table"),
3013            ("enable_compaction", "true"),
3014            ("compaction.max_snapshots_num", "100"),
3015            ("compaction.small_files_threshold_mb", "512"),
3016            ("compaction.delete_files_count_threshold", "50"),
3017            ("compaction.trigger_snapshot_count", "10"),
3018            ("compaction.target_file_size_mb", "256"),
3019            ("compaction.type", "full"),
3020        ]
3021        .into_iter()
3022        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3023        .collect();
3024
3025        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3026
3027        // Verify all compaction config fields are parsed correctly
3028        assert!(iceberg_config.enable_compaction);
3029        assert_eq!(
3030            iceberg_config.max_snapshots_num_before_compaction,
3031            Some(100)
3032        );
3033        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3034        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3035        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3036        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3037        assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3038    }
3039}