risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31    SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40    POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45    DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86    SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95    TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100pub const ICEBERG_COW_BRANCH: &str = "ingestion";
101pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
102pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
103pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
104pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
105pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
106
107pub const PARTITION_DATA_ID_START: i32 = 1000;
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
110#[serde(rename_all = "kebab-case")]
111pub enum IcebergWriteMode {
112    #[default]
113    MergeOnRead,
114    CopyOnWrite,
115}
116
117impl IcebergWriteMode {
118    pub fn as_str(self) -> &'static str {
119        match self {
120            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
121            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
122        }
123    }
124}
125
126impl std::str::FromStr for IcebergWriteMode {
127    type Err = SinkError;
128
129    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
130        match s {
131            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
132            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
133            _ => Err(SinkError::Config(anyhow!(format!(
134                "invalid write_mode: {}, must be one of: {}, {}",
135                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
136            )))),
137        }
138    }
139}
140
141impl TryFrom<&str> for IcebergWriteMode {
142    type Error = <Self as std::str::FromStr>::Err;
143
144    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
145        value.parse()
146    }
147}
148
149impl TryFrom<String> for IcebergWriteMode {
150    type Error = <Self as std::str::FromStr>::Err;
151
152    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
153        value.as_str().parse()
154    }
155}
156
157impl std::fmt::Display for IcebergWriteMode {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.write_str(self.as_str())
160    }
161}
162
163// Configuration constants
164pub const ENABLE_COMPACTION: &str = "enable_compaction";
165pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
166pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
167pub const WRITE_MODE: &str = "write_mode";
168pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
169pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
170pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
171pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
172    "snapshot_expiration_clear_expired_meta_data";
173pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
174
175pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
176
177pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
178
179pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
180
181pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
182
183pub const COMPACTION_TYPE: &str = "compaction.type";
184
185fn default_commit_retry_num() -> u32 {
186    8
187}
188
189fn default_iceberg_write_mode() -> IcebergWriteMode {
190    IcebergWriteMode::MergeOnRead
191}
192
193fn default_true() -> bool {
194    true
195}
196
197fn default_some_true() -> Option<bool> {
198    Some(true)
199}
200
201/// Compaction type for Iceberg sink
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
203#[serde(rename_all = "kebab-case")]
204pub enum CompactionType {
205    /// Full compaction - rewrites all data files
206    #[default]
207    Full,
208    /// Small files compaction - only compact small files
209    SmallFiles,
210    /// Files with delete compaction - only compact files that have associated delete files
211    FilesWithDelete,
212}
213
214impl CompactionType {
215    pub fn as_str(&self) -> &'static str {
216        match self {
217            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
218            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
219            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
220        }
221    }
222}
223
224impl std::str::FromStr for CompactionType {
225    type Err = SinkError;
226
227    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
228        match s {
229            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
230            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
231            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
232            _ => Err(SinkError::Config(anyhow!(format!(
233                "invalid compaction_type: {}, must be one of: {}, {}, {}",
234                s,
235                ICEBERG_COMPACTION_TYPE_FULL,
236                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
237                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
238            )))),
239        }
240    }
241}
242
243impl TryFrom<&str> for CompactionType {
244    type Error = <Self as std::str::FromStr>::Err;
245
246    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
247        value.parse()
248    }
249}
250
251impl TryFrom<String> for CompactionType {
252    type Error = <Self as std::str::FromStr>::Err;
253
254    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
255        value.as_str().parse()
256    }
257}
258
259impl std::fmt::Display for CompactionType {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.as_str())
262    }
263}
264
265#[serde_as]
266#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
267pub struct IcebergConfig {
268    pub r#type: String, // accept "append-only" or "upsert"
269
270    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
271    pub force_append_only: bool,
272
273    #[serde(flatten)]
274    common: IcebergCommon,
275
276    #[serde(flatten)]
277    table: IcebergTableIdentifier,
278
279    #[serde(
280        rename = "primary_key",
281        default,
282        deserialize_with = "deserialize_optional_string_seq_from_string"
283    )]
284    pub primary_key: Option<Vec<String>>,
285
286    // Props for java catalog props.
287    #[serde(skip)]
288    pub java_catalog_props: HashMap<String, String>,
289
290    #[serde(default)]
291    pub partition_by: Option<String>,
292
293    /// Commit every n(>0) checkpoints, default is 60.
294    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
295    #[serde_as(as = "DisplayFromStr")]
296    #[with_option(allow_alter_on_fly)]
297    pub commit_checkpoint_interval: u64,
298
299    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
300    pub create_table_if_not_exists: bool,
301
302    /// Whether it is `exactly_once`, the default is true.
303    #[serde(default = "default_some_true")]
304    #[serde_as(as = "Option<DisplayFromStr>")]
305    pub is_exactly_once: Option<bool>,
306    // Retry commit num when iceberg commit fail. default is 8.
307    // # TODO
308    // Iceberg table may store the retry commit num in table meta.
309    // We should try to find and use that as default commit retry num first.
310    #[serde(default = "default_commit_retry_num")]
311    pub commit_retry_num: u32,
312
313    /// Whether to enable iceberg compaction.
314    #[serde(
315        rename = "enable_compaction",
316        default,
317        deserialize_with = "deserialize_bool_from_string"
318    )]
319    #[with_option(allow_alter_on_fly)]
320    pub enable_compaction: bool,
321
322    /// The interval of iceberg compaction
323    #[serde(rename = "compaction_interval_sec", default)]
324    #[serde_as(as = "Option<DisplayFromStr>")]
325    #[with_option(allow_alter_on_fly)]
326    pub compaction_interval_sec: Option<u64>,
327
328    /// Whether to enable iceberg expired snapshots.
329    #[serde(
330        rename = "enable_snapshot_expiration",
331        default,
332        deserialize_with = "deserialize_bool_from_string"
333    )]
334    #[with_option(allow_alter_on_fly)]
335    pub enable_snapshot_expiration: bool,
336
337    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
338    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
339    pub write_mode: IcebergWriteMode,
340
341    /// The maximum age (in milliseconds) for snapshots before they expire
342    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
343    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
344    #[serde_as(as = "Option<DisplayFromStr>")]
345    #[with_option(allow_alter_on_fly)]
346    pub snapshot_expiration_max_age_millis: Option<i64>,
347
348    /// The number of snapshots to retain
349    #[serde(rename = "snapshot_expiration_retain_last", default)]
350    #[serde_as(as = "Option<DisplayFromStr>")]
351    #[with_option(allow_alter_on_fly)]
352    pub snapshot_expiration_retain_last: Option<i32>,
353
354    #[serde(
355        rename = "snapshot_expiration_clear_expired_files",
356        default = "default_true",
357        deserialize_with = "deserialize_bool_from_string"
358    )]
359    #[with_option(allow_alter_on_fly)]
360    pub snapshot_expiration_clear_expired_files: bool,
361
362    #[serde(
363        rename = "snapshot_expiration_clear_expired_meta_data",
364        default = "default_true",
365        deserialize_with = "deserialize_bool_from_string"
366    )]
367    #[with_option(allow_alter_on_fly)]
368    pub snapshot_expiration_clear_expired_meta_data: bool,
369
370    /// The maximum number of snapshots allowed since the last rewrite operation
371    /// If set, sink will check snapshot count and wait if exceeded
372    #[serde(rename = "compaction.max_snapshots_num", default)]
373    #[serde_as(as = "Option<DisplayFromStr>")]
374    #[with_option(allow_alter_on_fly)]
375    pub max_snapshots_num_before_compaction: Option<usize>,
376
377    #[serde(rename = "compaction.small_files_threshold_mb", default)]
378    #[serde_as(as = "Option<DisplayFromStr>")]
379    #[with_option(allow_alter_on_fly)]
380    pub small_files_threshold_mb: Option<u64>,
381
382    #[serde(rename = "compaction.delete_files_count_threshold", default)]
383    #[serde_as(as = "Option<DisplayFromStr>")]
384    #[with_option(allow_alter_on_fly)]
385    pub delete_files_count_threshold: Option<usize>,
386
387    #[serde(rename = "compaction.trigger_snapshot_count", default)]
388    #[serde_as(as = "Option<DisplayFromStr>")]
389    #[with_option(allow_alter_on_fly)]
390    pub trigger_snapshot_count: Option<usize>,
391
392    #[serde(rename = "compaction.target_file_size_mb", default)]
393    #[serde_as(as = "Option<DisplayFromStr>")]
394    #[with_option(allow_alter_on_fly)]
395    pub target_file_size_mb: Option<u64>,
396
397    /// Compaction type: `full`, `small-files`, or `files-with-delete`
398    /// If not set, will default to `full`
399    #[serde(rename = "compaction.type", default)]
400    #[with_option(allow_alter_on_fly)]
401    pub compaction_type: Option<CompactionType>,
402}
403
404impl EnforceSecret for IcebergConfig {
405    fn enforce_secret<'a>(
406        prop_iter: impl Iterator<Item = &'a str>,
407    ) -> crate::error::ConnectorResult<()> {
408        for prop in prop_iter {
409            IcebergCommon::enforce_one(prop)?;
410        }
411        Ok(())
412    }
413
414    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
415        IcebergCommon::enforce_one(prop)
416    }
417}
418
419impl IcebergConfig {
420    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
421        let mut config =
422            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
423                .map_err(|e| SinkError::Config(anyhow!(e)))?;
424
425        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
426            return Err(SinkError::Config(anyhow!(
427                "`{}` must be {}, or {}",
428                SINK_TYPE_OPTION,
429                SINK_TYPE_APPEND_ONLY,
430                SINK_TYPE_UPSERT
431            )));
432        }
433
434        if config.r#type == SINK_TYPE_UPSERT {
435            if let Some(primary_key) = &config.primary_key {
436                if primary_key.is_empty() {
437                    return Err(SinkError::Config(anyhow!(
438                        "`primary-key` must not be empty in {}",
439                        SINK_TYPE_UPSERT
440                    )));
441                }
442            } else {
443                return Err(SinkError::Config(anyhow!(
444                    "Must set `primary-key` in {}",
445                    SINK_TYPE_UPSERT
446                )));
447            }
448        }
449
450        // All configs start with "catalog." will be treated as java configs.
451        config.java_catalog_props = values
452            .iter()
453            .filter(|(k, _v)| {
454                k.starts_with("catalog.")
455                    && k != &"catalog.uri"
456                    && k != &"catalog.type"
457                    && k != &"catalog.name"
458                    && k != &"catalog.header"
459            })
460            .map(|(k, v)| (k[8..].to_string(), v.clone()))
461            .collect();
462
463        if config.commit_checkpoint_interval == 0 {
464            return Err(SinkError::Config(anyhow!(
465                "`commit-checkpoint-interval` must be greater than 0"
466            )));
467        }
468
469        Ok(config)
470    }
471
472    pub fn catalog_type(&self) -> &str {
473        self.common.catalog_type()
474    }
475
476    pub async fn load_table(&self) -> Result<Table> {
477        self.common
478            .load_table(&self.table, &self.java_catalog_props)
479            .await
480            .map_err(Into::into)
481    }
482
483    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
484        self.common
485            .create_catalog(&self.java_catalog_props)
486            .await
487            .map_err(Into::into)
488    }
489
490    pub fn full_table_name(&self) -> Result<TableIdent> {
491        self.table.to_table_ident().map_err(Into::into)
492    }
493
494    pub fn catalog_name(&self) -> String {
495        self.common.catalog_name()
496    }
497
498    pub fn compaction_interval_sec(&self) -> u64 {
499        // default to 1 hour
500        self.compaction_interval_sec.unwrap_or(3600)
501    }
502
503    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
504    /// Returns `current_time_ms` - `max_age_millis`
505    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
506        self.snapshot_expiration_max_age_millis
507            .map(|max_age_millis| current_time_ms - max_age_millis)
508    }
509
510    pub fn trigger_snapshot_count(&self) -> usize {
511        self.trigger_snapshot_count.unwrap_or(16)
512    }
513
514    pub fn small_files_threshold_mb(&self) -> u64 {
515        self.small_files_threshold_mb.unwrap_or(64)
516    }
517
518    pub fn delete_files_count_threshold(&self) -> usize {
519        self.delete_files_count_threshold.unwrap_or(256)
520    }
521
522    pub fn target_file_size_mb(&self) -> u64 {
523        self.target_file_size_mb.unwrap_or(1024)
524    }
525
526    /// Get the compaction type as an enum
527    /// This method parses the string and returns the enum value
528    pub fn compaction_type(&self) -> CompactionType {
529        self.compaction_type.unwrap_or_default()
530    }
531}
532
533pub struct IcebergSink {
534    pub config: IcebergConfig,
535    param: SinkParam,
536    // In upsert mode, it never be None and empty.
537    unique_column_ids: Option<Vec<usize>>,
538}
539
540impl EnforceSecret for IcebergSink {
541    fn enforce_secret<'a>(
542        prop_iter: impl Iterator<Item = &'a str>,
543    ) -> crate::error::ConnectorResult<()> {
544        for prop in prop_iter {
545            IcebergConfig::enforce_one(prop)?;
546        }
547        Ok(())
548    }
549}
550
551impl TryFrom<SinkParam> for IcebergSink {
552    type Error = SinkError;
553
554    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
555        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
556        IcebergSink::new(config, param)
557    }
558}
559
560impl Debug for IcebergSink {
561    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562        f.debug_struct("IcebergSink")
563            .field("config", &self.config)
564            .finish()
565    }
566}
567
568async fn create_and_validate_table_impl(
569    config: &IcebergConfig,
570    param: &SinkParam,
571) -> Result<Table> {
572    if config.create_table_if_not_exists {
573        create_table_if_not_exists_impl(config, param).await?;
574    }
575
576    let table = config
577        .load_table()
578        .await
579        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
580
581    let sink_schema = param.schema();
582    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
583        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
584
585    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
586        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
587
588    Ok(table)
589}
590
591async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
592    let catalog = config.create_catalog().await?;
593    let namespace = if let Some(database_name) = config.table.database_name() {
594        let namespace = NamespaceIdent::new(database_name.to_owned());
595        if !catalog
596            .namespace_exists(&namespace)
597            .await
598            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
599        {
600            catalog
601                .create_namespace(&namespace, HashMap::default())
602                .await
603                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
604                .context("failed to create iceberg namespace")?;
605        }
606        namespace
607    } else {
608        bail!("database name must be set if you want to create table")
609    };
610
611    let table_id = config
612        .full_table_name()
613        .context("Unable to parse table name")?;
614    if !catalog
615        .table_exists(&table_id)
616        .await
617        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
618    {
619        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
620        // convert risingwave schema -> arrow schema -> iceberg schema
621        let arrow_fields = param
622            .columns
623            .iter()
624            .map(|column| {
625                Ok(iceberg_create_table_arrow_convert
626                    .to_arrow_field(&column.name, &column.data_type)
627                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
628                    .context(format!(
629                        "failed to convert {}: {} to arrow type",
630                        &column.name, &column.data_type
631                    ))?)
632            })
633            .collect::<Result<Vec<ArrowField>>>()?;
634        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
635        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
636            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
637            .context("failed to convert arrow schema to iceberg schema")?;
638
639        let location = {
640            let mut names = namespace.clone().inner();
641            names.push(config.table.table_name().to_owned());
642            match &config.common.warehouse_path {
643                Some(warehouse_path) => {
644                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
645                    let url = Url::parse(warehouse_path);
646                    if url.is_err() || is_s3_tables {
647                        // For rest catalog, the warehouse_path could be a warehouse name.
648                        // In this case, we should specify the location when creating a table.
649                        if config.common.catalog_type() == "rest"
650                            || config.common.catalog_type() == "rest_rust"
651                        {
652                            None
653                        } else {
654                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
655                        }
656                    } else if warehouse_path.ends_with('/') {
657                        Some(format!("{}{}", warehouse_path, names.join("/")))
658                    } else {
659                        Some(format!("{}/{}", warehouse_path, names.join("/")))
660                    }
661                }
662                None => None,
663            }
664        };
665
666        let partition_spec = match &config.partition_by {
667            Some(partition_by) => {
668                let mut partition_fields = Vec::<UnboundPartitionField>::new();
669                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
670                    .into_iter()
671                    .enumerate()
672                {
673                    match iceberg_schema.field_id_by_name(&column) {
674                        Some(id) => partition_fields.push(
675                            UnboundPartitionField::builder()
676                                .source_id(id)
677                                .transform(transform)
678                                .name(format!("_p_{}", column))
679                                .field_id(PARTITION_DATA_ID_START + i as i32)
680                                .build(),
681                        ),
682                        None => bail!(format!(
683                            "Partition source column does not exist in schema: {}",
684                            column
685                        )),
686                    };
687                }
688                Some(
689                    UnboundPartitionSpec::builder()
690                        .with_spec_id(0)
691                        .add_partition_fields(partition_fields)
692                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
693                        .context("failed to add partition columns")?
694                        .build(),
695                )
696            }
697            None => None,
698        };
699
700        let table_creation_builder = TableCreation::builder()
701            .name(config.table.table_name().to_owned())
702            .schema(iceberg_schema);
703
704        let table_creation = match (location, partition_spec) {
705            (Some(location), Some(partition_spec)) => table_creation_builder
706                .location(location)
707                .partition_spec(partition_spec)
708                .build(),
709            (Some(location), None) => table_creation_builder.location(location).build(),
710            (None, Some(partition_spec)) => table_creation_builder
711                .partition_spec(partition_spec)
712                .build(),
713            (None, None) => table_creation_builder.build(),
714        };
715
716        catalog
717            .create_table(&namespace, table_creation)
718            .await
719            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
720            .context("failed to create iceberg table")?;
721    }
722    Ok(())
723}
724
725impl IcebergSink {
726    pub async fn create_and_validate_table(&self) -> Result<Table> {
727        create_and_validate_table_impl(&self.config, &self.param).await
728    }
729
730    pub async fn create_table_if_not_exists(&self) -> Result<()> {
731        create_table_if_not_exists_impl(&self.config, &self.param).await
732    }
733
734    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
735        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
736            if let Some(pk) = &config.primary_key {
737                let mut unique_column_ids = Vec::with_capacity(pk.len());
738                for col_name in pk {
739                    let id = param
740                        .columns
741                        .iter()
742                        .find(|col| col.name.as_str() == col_name)
743                        .ok_or_else(|| {
744                            SinkError::Config(anyhow!(
745                                "Primary key column {} not found in sink schema",
746                                col_name
747                            ))
748                        })?
749                        .column_id
750                        .get_id() as usize;
751                    unique_column_ids.push(id);
752                }
753                Some(unique_column_ids)
754            } else {
755                unreachable!()
756            }
757        } else {
758            None
759        };
760        Ok(Self {
761            config,
762            param,
763            unique_column_ids,
764        })
765    }
766}
767
768impl Sink for IcebergSink {
769    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
770
771    const SINK_NAME: &'static str = ICEBERG_SINK;
772
773    async fn validate(&self) -> Result<()> {
774        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
775            bail!("Snowflake catalog only supports iceberg sources");
776        }
777
778        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
779            risingwave_common::license::Feature::IcebergSinkWithGlue
780                .check_available()
781                .map_err(|e| anyhow::anyhow!(e))?;
782        }
783
784        // Validate compaction type configuration
785        let compaction_type = self.config.compaction_type();
786
787        // Check COW mode constraints
788        // COW mode only supports 'full' compaction type
789        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
790            && compaction_type != CompactionType::Full
791        {
792            bail!(
793                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
794                compaction_type
795            );
796        }
797
798        match compaction_type {
799            CompactionType::SmallFiles => {
800                // 1. check license
801                risingwave_common::license::Feature::IcebergCompaction
802                    .check_available()
803                    .map_err(|e| anyhow::anyhow!(e))?;
804
805                // 2. check write mode
806                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
807                    bail!(
808                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
809                        self.config.write_mode
810                    );
811                }
812
813                // 3. check conflicting parameters
814                if self.config.delete_files_count_threshold.is_some() {
815                    bail!(
816                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
817                    );
818                }
819            }
820            CompactionType::FilesWithDelete => {
821                // 1. check license
822                risingwave_common::license::Feature::IcebergCompaction
823                    .check_available()
824                    .map_err(|e| anyhow::anyhow!(e))?;
825
826                // 2. check write mode
827                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
828                    bail!(
829                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
830                        self.config.write_mode
831                    );
832                }
833
834                // 3. check conflicting parameters
835                if self.config.small_files_threshold_mb.is_some() {
836                    bail!(
837                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
838                    );
839                }
840            }
841            CompactionType::Full => {
842                // Full compaction has no special requirements
843            }
844        }
845
846        let _ = self.create_and_validate_table().await?;
847        Ok(())
848    }
849
850    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
851        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
852
853        // Validate compaction interval
854        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
855            if iceberg_config.enable_compaction && compaction_interval == 0 {
856                bail!(
857                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
858                );
859            }
860
861            // log compaction_interval
862            tracing::info!(
863                "Alter config compaction_interval set to {} seconds",
864                compaction_interval
865            );
866        }
867
868        // Validate max snapshots
869        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
870            && max_snapshots < 1
871        {
872            bail!(
873                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
874                max_snapshots
875            );
876        }
877
878        Ok(())
879    }
880
881    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
882        let writer = IcebergSinkWriter::new(
883            self.config.clone(),
884            self.param.clone(),
885            writer_param.clone(),
886            self.unique_column_ids.clone(),
887        );
888
889        let commit_checkpoint_interval =
890            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
891                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
892            );
893        let log_sinker = CoordinatedLogSinker::new(
894            &writer_param,
895            self.param.clone(),
896            writer,
897            commit_checkpoint_interval,
898        )
899        .await?;
900
901        Ok(log_sinker)
902    }
903
904    fn is_coordinated_sink(&self) -> bool {
905        true
906    }
907
908    async fn new_coordinator(
909        &self,
910        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
911    ) -> Result<SinkCommitCoordinator> {
912        let catalog = self.config.create_catalog().await?;
913        let table = self.create_and_validate_table().await?;
914        let coordinator = IcebergSinkCommitter {
915            catalog,
916            table,
917            last_commit_epoch: 0,
918            sink_id: self.param.sink_id,
919            config: self.config.clone(),
920            param: self.param.clone(),
921            commit_retry_num: self.config.commit_retry_num,
922            iceberg_compact_stat_sender,
923        };
924        if self.config.is_exactly_once.unwrap_or_default() {
925            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
926        } else {
927            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
928        }
929    }
930}
931
932/// None means no project.
933/// Prepare represent the extra partition column idx.
934/// Done represents the project idx vec.
935///
936/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
937/// to create the project idx vec.
938enum ProjectIdxVec {
939    None,
940    Prepare(usize),
941    Done(Vec<usize>),
942}
943
944type DataFileWriterBuilderType =
945    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
946type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
947    ParquetWriterBuilder,
948    DefaultLocationGenerator,
949    DefaultFileNameGenerator,
950>;
951type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
952    ParquetWriterBuilder,
953    DefaultLocationGenerator,
954    DefaultFileNameGenerator,
955>;
956
957#[derive(Clone)]
958struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
959    inner: B,
960    fanout_enabled: bool,
961    schema: IcebergSchemaRef,
962    partition_spec: PartitionSpecRef,
963    compute_partition: bool,
964}
965
966impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
967    fn new(
968        inner: B,
969        fanout_enabled: bool,
970        schema: IcebergSchemaRef,
971        partition_spec: PartitionSpecRef,
972        compute_partition: bool,
973    ) -> Self {
974        Self {
975            inner,
976            fanout_enabled,
977            schema,
978            partition_spec,
979            compute_partition,
980        }
981    }
982
983    fn build(self) -> iceberg::Result<TaskWriter<B>> {
984        let partition_splitter = match (
985            self.partition_spec.is_unpartitioned(),
986            self.compute_partition,
987        ) {
988            (true, _) => None,
989            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
990                self.schema.clone(),
991                self.partition_spec.clone(),
992            )?),
993            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
994                self.schema.clone(),
995                self.partition_spec.clone(),
996            )?),
997        };
998
999        Ok(TaskWriter::new_with_partition_splitter(
1000            self.inner,
1001            self.fanout_enabled,
1002            self.schema,
1003            self.partition_spec,
1004            partition_splitter,
1005        ))
1006    }
1007}
1008
1009pub enum IcebergSinkWriter {
1010    Created(IcebergSinkWriterArgs),
1011    Initialized(IcebergSinkWriterInner),
1012}
1013
1014pub struct IcebergSinkWriterArgs {
1015    config: IcebergConfig,
1016    sink_param: SinkParam,
1017    writer_param: SinkWriterParam,
1018    unique_column_ids: Option<Vec<usize>>,
1019}
1020
1021pub struct IcebergSinkWriterInner {
1022    writer: IcebergWriterDispatch,
1023    arrow_schema: SchemaRef,
1024    // See comments below
1025    metrics: IcebergWriterMetrics,
1026    // State of iceberg table for this writer
1027    table: Table,
1028    // For chunk with extra partition column, we should remove this column before write.
1029    // This project index vec is used to avoid create project idx each time.
1030    project_idx_vec: ProjectIdxVec,
1031}
1032
1033#[allow(clippy::type_complexity)]
1034enum IcebergWriterDispatch {
1035    Append {
1036        writer: Option<Box<dyn IcebergWriter>>,
1037        writer_builder:
1038            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1039    },
1040    Upsert {
1041        writer: Option<Box<dyn IcebergWriter>>,
1042        writer_builder: TaskWriterBuilderWrapper<
1043            MonitoredGeneralWriterBuilder<
1044                DeltaWriterBuilder<
1045                    DataFileWriterBuilderType,
1046                    PositionDeleteFileWriterBuilderType,
1047                    EqualityDeleteFileWriterBuilderType,
1048                >,
1049            >,
1050        >,
1051        arrow_schema_with_op_column: SchemaRef,
1052    },
1053}
1054
1055impl IcebergWriterDispatch {
1056    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1057        match self {
1058            IcebergWriterDispatch::Append { writer, .. }
1059            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1060        }
1061    }
1062}
1063
1064pub struct IcebergWriterMetrics {
1065    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1066    // They are actually used in `PrometheusWriterBuilder`:
1067    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1068    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1069    _write_qps: LabelGuardedIntCounter,
1070    _write_latency: LabelGuardedHistogram,
1071    write_bytes: LabelGuardedIntCounter,
1072}
1073
1074impl IcebergSinkWriter {
1075    pub fn new(
1076        config: IcebergConfig,
1077        sink_param: SinkParam,
1078        writer_param: SinkWriterParam,
1079        unique_column_ids: Option<Vec<usize>>,
1080    ) -> Self {
1081        Self::Created(IcebergSinkWriterArgs {
1082            config,
1083            sink_param,
1084            writer_param,
1085            unique_column_ids,
1086        })
1087    }
1088}
1089
1090impl IcebergSinkWriterInner {
1091    fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1092        let SinkWriterParam {
1093            extra_partition_col_idx,
1094            actor_id,
1095            sink_id,
1096            sink_name,
1097            ..
1098        } = writer_param;
1099        let metrics_labels = [
1100            &actor_id.to_string(),
1101            &sink_id.to_string(),
1102            sink_name.as_str(),
1103        ];
1104
1105        // Metrics
1106        let write_qps = GLOBAL_SINK_METRICS
1107            .iceberg_write_qps
1108            .with_guarded_label_values(&metrics_labels);
1109        let write_latency = GLOBAL_SINK_METRICS
1110            .iceberg_write_latency
1111            .with_guarded_label_values(&metrics_labels);
1112        // # TODO
1113        // Unused. Add this metrics later.
1114        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1115            .iceberg_rolling_unflushed_data_file
1116            .with_guarded_label_values(&metrics_labels);
1117        let write_bytes = GLOBAL_SINK_METRICS
1118            .iceberg_write_bytes
1119            .with_guarded_label_values(&metrics_labels);
1120
1121        let schema = table.metadata().current_schema();
1122        let partition_spec = table.metadata().default_partition_spec();
1123        let fanout_enabled = !partition_spec.fields().is_empty();
1124
1125        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1126        let unique_uuid_suffix = Uuid::now_v7();
1127
1128        let parquet_writer_properties = WriterProperties::builder()
1129            .set_max_row_group_size(
1130                writer_param
1131                    .streaming_config
1132                    .developer
1133                    .iceberg_sink_write_parquet_max_row_group_rows,
1134            )
1135            .build();
1136
1137        let parquet_writer_builder =
1138            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1139        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1140            parquet_writer_builder,
1141            table.file_io().clone(),
1142            DefaultLocationGenerator::new(table.metadata().clone())
1143                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1144            DefaultFileNameGenerator::new(
1145                writer_param.actor_id.to_string(),
1146                Some(unique_uuid_suffix.to_string()),
1147                iceberg::spec::DataFileFormat::Parquet,
1148            ),
1149        );
1150        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1151        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1152            data_file_builder,
1153            write_qps.clone(),
1154            write_latency.clone(),
1155        );
1156        let writer_builder = TaskWriterBuilderWrapper::new(
1157            monitored_builder,
1158            fanout_enabled,
1159            schema.clone(),
1160            partition_spec.clone(),
1161            true,
1162        );
1163        let inner_writer = Some(Box::new(
1164            writer_builder
1165                .clone()
1166                .build()
1167                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1168        ) as Box<dyn IcebergWriter>);
1169        Ok(Self {
1170            arrow_schema: Arc::new(
1171                schema_to_arrow_schema(table.metadata().current_schema())
1172                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1173            ),
1174            metrics: IcebergWriterMetrics {
1175                _write_qps: write_qps,
1176                _write_latency: write_latency,
1177                write_bytes,
1178            },
1179            writer: IcebergWriterDispatch::Append {
1180                writer: inner_writer,
1181                writer_builder,
1182            },
1183            table,
1184            project_idx_vec: {
1185                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1186                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1187                } else {
1188                    ProjectIdxVec::None
1189                }
1190            },
1191        })
1192    }
1193
1194    fn build_upsert(
1195        table: Table,
1196        unique_column_ids: Vec<usize>,
1197        writer_param: &SinkWriterParam,
1198    ) -> Result<Self> {
1199        let SinkWriterParam {
1200            extra_partition_col_idx,
1201            actor_id,
1202            sink_id,
1203            sink_name,
1204            ..
1205        } = writer_param;
1206        let metrics_labels = [
1207            &actor_id.to_string(),
1208            &sink_id.to_string(),
1209            sink_name.as_str(),
1210        ];
1211        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1212
1213        // Metrics
1214        let write_qps = GLOBAL_SINK_METRICS
1215            .iceberg_write_qps
1216            .with_guarded_label_values(&metrics_labels);
1217        let write_latency = GLOBAL_SINK_METRICS
1218            .iceberg_write_latency
1219            .with_guarded_label_values(&metrics_labels);
1220        // # TODO
1221        // Unused. Add this metrics later.
1222        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1223            .iceberg_rolling_unflushed_data_file
1224            .with_guarded_label_values(&metrics_labels);
1225        let write_bytes = GLOBAL_SINK_METRICS
1226            .iceberg_write_bytes
1227            .with_guarded_label_values(&metrics_labels);
1228
1229        // Determine the schema id and partition spec id
1230        let schema = table.metadata().current_schema();
1231        let partition_spec = table.metadata().default_partition_spec();
1232        let fanout_enabled = !partition_spec.fields().is_empty();
1233
1234        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1235        let unique_uuid_suffix = Uuid::now_v7();
1236
1237        let parquet_writer_properties = WriterProperties::builder()
1238            .set_max_row_group_size(
1239                writer_param
1240                    .streaming_config
1241                    .developer
1242                    .iceberg_sink_write_parquet_max_row_group_rows,
1243            )
1244            .build();
1245
1246        let data_file_builder = {
1247            let parquet_writer_builder =
1248                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1249            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1250                parquet_writer_builder,
1251                table.file_io().clone(),
1252                DefaultLocationGenerator::new(table.metadata().clone())
1253                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1254                DefaultFileNameGenerator::new(
1255                    writer_param.actor_id.to_string(),
1256                    Some(unique_uuid_suffix.to_string()),
1257                    iceberg::spec::DataFileFormat::Parquet,
1258                ),
1259            );
1260            DataFileWriterBuilder::new(rolling_writer_builder)
1261        };
1262        let position_delete_builder = {
1263            let parquet_writer_builder = ParquetWriterBuilder::new(
1264                parquet_writer_properties.clone(),
1265                POSITION_DELETE_SCHEMA.clone().into(),
1266            );
1267            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1268                parquet_writer_builder,
1269                table.file_io().clone(),
1270                DefaultLocationGenerator::new(table.metadata().clone())
1271                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1272                DefaultFileNameGenerator::new(
1273                    writer_param.actor_id.to_string(),
1274                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1275                    iceberg::spec::DataFileFormat::Parquet,
1276                ),
1277            );
1278            PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1279        };
1280        let equality_delete_builder = {
1281            let config = EqualityDeleteWriterConfig::new(
1282                unique_column_ids.clone(),
1283                table.metadata().current_schema().clone(),
1284            )
1285            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1286            let parquet_writer_builder = ParquetWriterBuilder::new(
1287                parquet_writer_properties,
1288                Arc::new(
1289                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1290                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1291                ),
1292            );
1293            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1294                parquet_writer_builder,
1295                table.file_io().clone(),
1296                DefaultLocationGenerator::new(table.metadata().clone())
1297                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1298                DefaultFileNameGenerator::new(
1299                    writer_param.actor_id.to_string(),
1300                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1301                    iceberg::spec::DataFileFormat::Parquet,
1302                ),
1303            );
1304
1305            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1306        };
1307        let delta_builder = DeltaWriterBuilder::new(
1308            data_file_builder,
1309            position_delete_builder,
1310            equality_delete_builder,
1311            unique_column_ids,
1312            schema.clone(),
1313        );
1314        let original_arrow_schema = Arc::new(
1315            schema_to_arrow_schema(table.metadata().current_schema())
1316                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1317        );
1318        let schema_with_extra_op_column = {
1319            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1320            new_fields.push(Arc::new(ArrowField::new(
1321                "op".to_owned(),
1322                ArrowDataType::Int32,
1323                false,
1324            )));
1325            Arc::new(ArrowSchema::new(new_fields))
1326        };
1327        let writer_builder = TaskWriterBuilderWrapper::new(
1328            MonitoredGeneralWriterBuilder::new(
1329                delta_builder,
1330                write_qps.clone(),
1331                write_latency.clone(),
1332            ),
1333            fanout_enabled,
1334            schema.clone(),
1335            partition_spec.clone(),
1336            true,
1337        );
1338        let inner_writer = Some(Box::new(
1339            writer_builder
1340                .clone()
1341                .build()
1342                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1343        ) as Box<dyn IcebergWriter>);
1344        Ok(Self {
1345            arrow_schema: original_arrow_schema,
1346            metrics: IcebergWriterMetrics {
1347                _write_qps: write_qps,
1348                _write_latency: write_latency,
1349                write_bytes,
1350            },
1351            table,
1352            writer: IcebergWriterDispatch::Upsert {
1353                writer: inner_writer,
1354                writer_builder,
1355                arrow_schema_with_op_column: schema_with_extra_op_column,
1356            },
1357            project_idx_vec: {
1358                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1359                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1360                } else {
1361                    ProjectIdxVec::None
1362                }
1363            },
1364        })
1365    }
1366}
1367
1368#[async_trait]
1369impl SinkWriter for IcebergSinkWriter {
1370    type CommitMetadata = Option<SinkMetadata>;
1371
1372    /// Begin a new epoch
1373    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1374        let Self::Created(args) = self else {
1375            return Ok(());
1376        };
1377
1378        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1379        let inner = match &args.unique_column_ids {
1380            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1381                table,
1382                unique_column_ids.clone(),
1383                &args.writer_param,
1384            )?,
1385            None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1386        };
1387
1388        *self = IcebergSinkWriter::Initialized(inner);
1389        Ok(())
1390    }
1391
1392    /// Write a stream chunk to sink
1393    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1394        let Self::Initialized(inner) = self else {
1395            unreachable!("IcebergSinkWriter should be initialized before barrier");
1396        };
1397
1398        // Try to build writer if it's None.
1399        match &mut inner.writer {
1400            IcebergWriterDispatch::Append {
1401                writer,
1402                writer_builder,
1403            } => {
1404                if writer.is_none() {
1405                    *writer = Some(Box::new(
1406                        writer_builder
1407                            .clone()
1408                            .build()
1409                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1410                    ));
1411                }
1412            }
1413            IcebergWriterDispatch::Upsert {
1414                writer,
1415                writer_builder,
1416                ..
1417            } => {
1418                if writer.is_none() {
1419                    *writer = Some(Box::new(
1420                        writer_builder
1421                            .clone()
1422                            .build()
1423                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1424                    ));
1425                }
1426            }
1427        };
1428
1429        // Process the chunk.
1430        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1431        match &mut inner.project_idx_vec {
1432            ProjectIdxVec::None => {}
1433            ProjectIdxVec::Prepare(idx) => {
1434                if *idx >= chunk.columns().len() {
1435                    return Err(SinkError::Iceberg(anyhow!(
1436                        "invalid extra partition column index {}",
1437                        idx
1438                    )));
1439                }
1440                let project_idx_vec = (0..*idx)
1441                    .chain(*idx + 1..chunk.columns().len())
1442                    .collect_vec();
1443                chunk = chunk.project(&project_idx_vec);
1444                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1445            }
1446            ProjectIdxVec::Done(idx_vec) => {
1447                chunk = chunk.project(idx_vec);
1448            }
1449        }
1450        if ops.is_empty() {
1451            return Ok(());
1452        }
1453        let write_batch_size = chunk.estimated_heap_size();
1454        let batch = match &inner.writer {
1455            IcebergWriterDispatch::Append { .. } => {
1456                // separate out insert chunk
1457                let filters =
1458                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1459                chunk.set_visibility(filters);
1460                IcebergArrowConvert
1461                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1462                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1463            }
1464            IcebergWriterDispatch::Upsert {
1465                arrow_schema_with_op_column,
1466                ..
1467            } => {
1468                let chunk = IcebergArrowConvert
1469                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1470                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1471                let ops = Arc::new(Int32Array::from(
1472                    ops.iter()
1473                        .map(|op| match op {
1474                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1475                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1476                        })
1477                        .collect_vec(),
1478                ));
1479                let mut columns = chunk.columns().to_vec();
1480                columns.push(ops);
1481                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1482                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1483            }
1484        };
1485
1486        let writer = inner.writer.get_writer().unwrap();
1487        writer
1488            .write(batch)
1489            .instrument_await("iceberg_write")
1490            .await
1491            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1492        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1493        Ok(())
1494    }
1495
1496    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1497    /// writer should commit the current epoch.
1498    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1499        let Self::Initialized(inner) = self else {
1500            unreachable!("IcebergSinkWriter should be initialized before barrier");
1501        };
1502
1503        // Skip it if not checkpoint
1504        if !is_checkpoint {
1505            return Ok(None);
1506        }
1507
1508        let close_result = match &mut inner.writer {
1509            IcebergWriterDispatch::Append {
1510                writer,
1511                writer_builder,
1512            } => {
1513                let close_result = match writer.take() {
1514                    Some(mut writer) => {
1515                        Some(writer.close().instrument_await("iceberg_close").await)
1516                    }
1517                    _ => None,
1518                };
1519                match writer_builder.clone().build() {
1520                    Ok(new_writer) => {
1521                        *writer = Some(Box::new(new_writer));
1522                    }
1523                    _ => {
1524                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1525                        // here because current writer may close successfully. So we just log the error.
1526                        warn!("Failed to build new writer after close");
1527                    }
1528                }
1529                close_result
1530            }
1531            IcebergWriterDispatch::Upsert {
1532                writer,
1533                writer_builder,
1534                ..
1535            } => {
1536                let close_result = match writer.take() {
1537                    Some(mut writer) => {
1538                        Some(writer.close().instrument_await("iceberg_close").await)
1539                    }
1540                    _ => None,
1541                };
1542                match writer_builder.clone().build() {
1543                    Ok(new_writer) => {
1544                        *writer = Some(Box::new(new_writer));
1545                    }
1546                    _ => {
1547                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1548                        // here because current writer may close successfully. So we just log the error.
1549                        warn!("Failed to build new writer after close");
1550                    }
1551                }
1552                close_result
1553            }
1554        };
1555
1556        match close_result {
1557            Some(Ok(result)) => {
1558                let format_version = inner.table.metadata().format_version();
1559                let partition_type = inner.table.metadata().default_partition_type();
1560                let data_files = result
1561                    .into_iter()
1562                    .map(|f| {
1563                        // Truncate large column statistics BEFORE serialization
1564                        let truncated = truncate_datafile(f);
1565                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1566                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1567                    })
1568                    .collect::<Result<Vec<_>>>()?;
1569                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1570                    data_files,
1571                    schema_id: inner.table.metadata().current_schema_id(),
1572                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1573                })?))
1574            }
1575            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1576            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1577        }
1578    }
1579}
1580
1581const SCHEMA_ID: &str = "schema_id";
1582const PARTITION_SPEC_ID: &str = "partition_spec_id";
1583const DATA_FILES: &str = "data_files";
1584
1585/// Maximum size for column statistics (min/max values) in bytes.
1586/// Column statistics larger than this will be truncated to avoid metadata bloat.
1587/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1588///
1589/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1590/// We truncate at the `DataFile` level (before serialization) by directly modifying
1591/// the public `lower_bounds` and `upper_bounds` fields.
1592///
1593/// This prevents metadata from ballooning to gigabytes when dealing with large
1594/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1595/// that benefit from query optimization.
1596const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1597
1598/// Truncate large column statistics from `DataFile` BEFORE serialization.
1599///
1600/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1601/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1602///
1603/// # Arguments
1604/// * `data_file` - A `DataFile` to process
1605///
1606/// # Returns
1607/// The modified `DataFile` with large statistics truncated
1608fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1609    // Process lower_bounds - remove entries with large values
1610    data_file.lower_bounds.retain(|field_id, datum| {
1611        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1612        let size = match datum.to_bytes() {
1613            Ok(bytes) => bytes.len(),
1614            Err(_) => 0,
1615        };
1616
1617        if size > MAX_COLUMN_STAT_SIZE {
1618            tracing::debug!(
1619                field_id = field_id,
1620                size = size,
1621                "Truncating large lower_bound statistic"
1622            );
1623            return false;
1624        }
1625        true
1626    });
1627
1628    // Process upper_bounds - remove entries with large values
1629    data_file.upper_bounds.retain(|field_id, datum| {
1630        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1631        let size = match datum.to_bytes() {
1632            Ok(bytes) => bytes.len(),
1633            Err(_) => 0,
1634        };
1635
1636        if size > MAX_COLUMN_STAT_SIZE {
1637            tracing::debug!(
1638                field_id = field_id,
1639                size = size,
1640                "Truncating large upper_bound statistic"
1641            );
1642            return false;
1643        }
1644        true
1645    });
1646
1647    data_file
1648}
1649
1650#[derive(Default, Clone)]
1651struct IcebergCommitResult {
1652    schema_id: i32,
1653    partition_spec_id: i32,
1654    data_files: Vec<SerializedDataFile>,
1655}
1656
1657impl IcebergCommitResult {
1658    fn try_from(value: &SinkMetadata) -> Result<Self> {
1659        if let Some(Serialized(v)) = &value.metadata {
1660            let mut values = if let serde_json::Value::Object(v) =
1661                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1662                    .context("Can't parse iceberg sink metadata")?
1663            {
1664                v
1665            } else {
1666                bail!("iceberg sink metadata should be an object");
1667            };
1668
1669            let schema_id;
1670            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1671                schema_id = value
1672                    .as_u64()
1673                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1674            } else {
1675                bail!("iceberg sink metadata should have schema_id");
1676            }
1677
1678            let partition_spec_id;
1679            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1680                partition_spec_id = value
1681                    .as_u64()
1682                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1683            } else {
1684                bail!("iceberg sink metadata should have partition_spec_id");
1685            }
1686
1687            let data_files: Vec<SerializedDataFile>;
1688            if let serde_json::Value::Array(values) = values
1689                .remove(DATA_FILES)
1690                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1691            {
1692                data_files = values
1693                    .into_iter()
1694                    .map(from_value::<SerializedDataFile>)
1695                    .collect::<std::result::Result<_, _>>()
1696                    .unwrap();
1697            } else {
1698                bail!("iceberg sink metadata should have data_files object");
1699            }
1700
1701            Ok(Self {
1702                schema_id: schema_id as i32,
1703                partition_spec_id: partition_spec_id as i32,
1704                data_files,
1705            })
1706        } else {
1707            bail!("Can't create iceberg sink write result from empty data!")
1708        }
1709    }
1710
1711    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1712        let mut values = if let serde_json::Value::Object(value) =
1713            serde_json::from_slice::<serde_json::Value>(&value)
1714                .context("Can't parse iceberg sink metadata")?
1715        {
1716            value
1717        } else {
1718            bail!("iceberg sink metadata should be an object");
1719        };
1720
1721        let schema_id;
1722        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1723            schema_id = value
1724                .as_u64()
1725                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1726        } else {
1727            bail!("iceberg sink metadata should have schema_id");
1728        }
1729
1730        let partition_spec_id;
1731        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1732            partition_spec_id = value
1733                .as_u64()
1734                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1735        } else {
1736            bail!("iceberg sink metadata should have partition_spec_id");
1737        }
1738
1739        let data_files: Vec<SerializedDataFile>;
1740        if let serde_json::Value::Array(values) = values
1741            .remove(DATA_FILES)
1742            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1743        {
1744            data_files = values
1745                .into_iter()
1746                .map(from_value::<SerializedDataFile>)
1747                .collect::<std::result::Result<_, _>>()
1748                .unwrap();
1749        } else {
1750            bail!("iceberg sink metadata should have data_files object");
1751        }
1752
1753        Ok(Self {
1754            schema_id: schema_id as i32,
1755            partition_spec_id: partition_spec_id as i32,
1756            data_files,
1757        })
1758    }
1759}
1760
1761impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1762    type Error = SinkError;
1763
1764    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1765        let json_data_files = serde_json::Value::Array(
1766            value
1767                .data_files
1768                .iter()
1769                .map(serde_json::to_value)
1770                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1771                .context("Can't serialize data files to json")?,
1772        );
1773        let json_value = serde_json::Value::Object(
1774            vec![
1775                (
1776                    SCHEMA_ID.to_owned(),
1777                    serde_json::Value::Number(value.schema_id.into()),
1778                ),
1779                (
1780                    PARTITION_SPEC_ID.to_owned(),
1781                    serde_json::Value::Number(value.partition_spec_id.into()),
1782                ),
1783                (DATA_FILES.to_owned(), json_data_files),
1784            ]
1785            .into_iter()
1786            .collect(),
1787        );
1788        Ok(SinkMetadata {
1789            metadata: Some(Serialized(SerializedMetadata {
1790                metadata: serde_json::to_vec(&json_value)
1791                    .context("Can't serialize iceberg sink metadata")?,
1792            })),
1793        })
1794    }
1795}
1796
1797impl TryFrom<IcebergCommitResult> for Vec<u8> {
1798    type Error = SinkError;
1799
1800    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1801        let json_data_files = serde_json::Value::Array(
1802            value
1803                .data_files
1804                .iter()
1805                .map(serde_json::to_value)
1806                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1807                .context("Can't serialize data files to json")?,
1808        );
1809        let json_value = serde_json::Value::Object(
1810            vec![
1811                (
1812                    SCHEMA_ID.to_owned(),
1813                    serde_json::Value::Number(value.schema_id.into()),
1814                ),
1815                (
1816                    PARTITION_SPEC_ID.to_owned(),
1817                    serde_json::Value::Number(value.partition_spec_id.into()),
1818                ),
1819                (DATA_FILES.to_owned(), json_data_files),
1820            ]
1821            .into_iter()
1822            .collect(),
1823        );
1824        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1825    }
1826}
1827pub struct IcebergSinkCommitter {
1828    catalog: Arc<dyn Catalog>,
1829    table: Table,
1830    pub last_commit_epoch: u64,
1831    pub(crate) sink_id: SinkId,
1832    pub(crate) config: IcebergConfig,
1833    pub(crate) param: SinkParam,
1834    commit_retry_num: u32,
1835    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1836}
1837
1838impl IcebergSinkCommitter {
1839    // Reload table and guarantee current schema_id and partition_spec_id matches
1840    // given `schema_id` and `partition_spec_id`
1841    async fn reload_table(
1842        catalog: &dyn Catalog,
1843        table_ident: &TableIdent,
1844        schema_id: i32,
1845        partition_spec_id: i32,
1846    ) -> Result<Table> {
1847        let table = catalog
1848            .load_table(table_ident)
1849            .await
1850            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1851        if table.metadata().current_schema_id() != schema_id {
1852            return Err(SinkError::Iceberg(anyhow!(
1853                "Schema evolution not supported, expect schema id {}, but got {}",
1854                schema_id,
1855                table.metadata().current_schema_id()
1856            )));
1857        }
1858        if table.metadata().default_partition_spec_id() != partition_spec_id {
1859            return Err(SinkError::Iceberg(anyhow!(
1860                "Partition evolution not supported, expect partition spec id {}, but got {}",
1861                partition_spec_id,
1862                table.metadata().default_partition_spec_id()
1863            )));
1864        }
1865        Ok(table)
1866    }
1867}
1868
1869#[async_trait]
1870impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1871    async fn init(&mut self) -> Result<()> {
1872        tracing::info!(
1873            "Sink id = {}: iceberg sink coordinator initing.",
1874            self.param.sink_id
1875        );
1876
1877        Ok(())
1878    }
1879
1880    async fn commit(
1881        &mut self,
1882        epoch: u64,
1883        metadata: Vec<SinkMetadata>,
1884        add_columns: Option<Vec<Field>>,
1885    ) -> Result<()> {
1886        tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1887
1888        let (write_results, snapshot_id) =
1889            match self.pre_commit_inner(epoch, metadata, add_columns)? {
1890                Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1891                None => {
1892                    tracing::debug!(?epoch, "no data to commit");
1893                    return Ok(());
1894                }
1895            };
1896
1897        self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1898            .await
1899    }
1900}
1901
1902#[async_trait]
1903impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1904    async fn init(&mut self) -> Result<()> {
1905        tracing::info!(
1906            "Sink id = {}: iceberg sink coordinator initing.",
1907            self.param.sink_id
1908        );
1909
1910        Ok(())
1911    }
1912
1913    async fn pre_commit(
1914        &mut self,
1915        epoch: u64,
1916        metadata: Vec<SinkMetadata>,
1917        add_columns: Option<Vec<Field>>,
1918    ) -> Result<Vec<u8>> {
1919        tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1920
1921        let (write_results, snapshot_id) =
1922            match self.pre_commit_inner(epoch, metadata, add_columns)? {
1923                Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1924                None => {
1925                    tracing::debug!(?epoch, "no data to commit");
1926                    return Ok(vec![]);
1927                }
1928            };
1929
1930        let mut write_results_bytes = Vec::new();
1931        for each_parallelism_write_result in write_results {
1932            let each_parallelism_write_result_bytes: Vec<u8> =
1933                each_parallelism_write_result.try_into()?;
1934            write_results_bytes.push(each_parallelism_write_result_bytes);
1935        }
1936
1937        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1938        write_results_bytes.push(snapshot_id_bytes);
1939
1940        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1941        Ok(pre_commit_metadata_bytes)
1942    }
1943
1944    async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1945        tracing::info!("Starting iceberg commit in epoch {epoch}");
1946        if commit_metadata.is_empty() {
1947            tracing::debug!(?epoch, "no data to commit");
1948            return Ok(());
1949        }
1950
1951        let mut write_results_bytes = deserialize_metadata(commit_metadata);
1952
1953        let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1954        let snapshot_id = i64::from_le_bytes(
1955            snapshot_id_bytes
1956                .try_into()
1957                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1958        );
1959
1960        if self
1961            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1962            .await?
1963        {
1964            tracing::info!(
1965                "Snapshot id {} already committed in iceberg table, skip committing again.",
1966                snapshot_id
1967            );
1968            return Ok(());
1969        }
1970
1971        let mut write_results = vec![];
1972        for each in write_results_bytes {
1973            let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1974            write_results.push(write_result);
1975        }
1976
1977        self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1978            .await?;
1979
1980        Ok(())
1981    }
1982
1983    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1984        // TODO: Files that have been written but not committed should be deleted.
1985        tracing::debug!("Abort not implemented yet");
1986    }
1987}
1988
1989/// Methods Required to Achieve Exactly Once Semantics
1990impl IcebergSinkCommitter {
1991    fn pre_commit_inner(
1992        &mut self,
1993        _epoch: u64,
1994        metadata: Vec<SinkMetadata>,
1995        add_columns: Option<Vec<Field>>,
1996    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
1997        if let Some(add_columns) = add_columns {
1998            return Err(anyhow!(
1999                "Iceberg sink not support add columns, but got: {:?}",
2000                add_columns
2001            )
2002            .into());
2003        }
2004
2005        let write_results: Vec<IcebergCommitResult> = metadata
2006            .iter()
2007            .map(IcebergCommitResult::try_from)
2008            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2009
2010        // Skip if no data to commit
2011        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2012            return Ok(None);
2013        }
2014
2015        let expect_schema_id = write_results[0].schema_id;
2016        let expect_partition_spec_id = write_results[0].partition_spec_id;
2017
2018        // guarantee that all write results has same schema_id and partition_spec_id
2019        if write_results
2020            .iter()
2021            .any(|r| r.schema_id != expect_schema_id)
2022            || write_results
2023                .iter()
2024                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2025        {
2026            return Err(SinkError::Iceberg(anyhow!(
2027                "schema_id and partition_spec_id should be the same in all write results"
2028            )));
2029        }
2030
2031        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2032
2033        Ok(Some((write_results, snapshot_id)))
2034    }
2035
2036    async fn commit_iceberg_inner(
2037        &mut self,
2038        epoch: u64,
2039        write_results: Vec<IcebergCommitResult>,
2040        snapshot_id: i64,
2041    ) -> Result<()> {
2042        // Empty write results should be handled before calling this function.
2043        assert!(
2044            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2045        );
2046
2047        // Check snapshot limit before proceeding with commit
2048        self.wait_for_snapshot_limit().await?;
2049
2050        let expect_schema_id = write_results[0].schema_id;
2051        let expect_partition_spec_id = write_results[0].partition_spec_id;
2052
2053        // Load the latest table to avoid concurrent modification with the best effort.
2054        self.table = Self::reload_table(
2055            self.catalog.as_ref(),
2056            self.table.identifier(),
2057            expect_schema_id,
2058            expect_partition_spec_id,
2059        )
2060        .await?;
2061
2062        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2063            return Err(SinkError::Iceberg(anyhow!(
2064                "Can't find schema by id {}",
2065                expect_schema_id
2066            )));
2067        };
2068        let Some(partition_spec) = self
2069            .table
2070            .metadata()
2071            .partition_spec_by_id(expect_partition_spec_id)
2072        else {
2073            return Err(SinkError::Iceberg(anyhow!(
2074                "Can't find partition spec by id {}",
2075                expect_partition_spec_id
2076            )));
2077        };
2078        let partition_type = partition_spec
2079            .as_ref()
2080            .clone()
2081            .partition_type(schema)
2082            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2083
2084        let data_files = write_results
2085            .into_iter()
2086            .flat_map(|r| {
2087                r.data_files.into_iter().map(|f| {
2088                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2089                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2090                })
2091            })
2092            .collect::<Result<Vec<DataFile>>>()?;
2093        // # TODO:
2094        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2095        // because retry logic involved reapply the commit metadata.
2096        // For now, we just retry the commit operation.
2097        let retry_strategy = ExponentialBackoff::from_millis(10)
2098            .max_delay(Duration::from_secs(60))
2099            .map(jitter)
2100            .take(self.commit_retry_num as usize);
2101        let catalog = self.catalog.clone();
2102        let table_ident = self.table.identifier().clone();
2103        let table = Retry::spawn(retry_strategy, || async {
2104            let table = Self::reload_table(
2105                catalog.as_ref(),
2106                &table_ident,
2107                expect_schema_id,
2108                expect_partition_spec_id,
2109            )
2110            .await?;
2111            let txn = Transaction::new(&table);
2112            let append_action = txn
2113                .fast_append()
2114                .set_snapshot_id(snapshot_id)
2115                .set_target_branch(commit_branch(
2116                    self.config.r#type.as_str(),
2117                    self.config.write_mode,
2118                ))
2119                .add_data_files(data_files.clone());
2120
2121            let tx = append_action.apply(txn).map_err(|err| {
2122                let err: IcebergError = err.into();
2123                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2124                SinkError::Iceberg(anyhow!(err))
2125            })?;
2126            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2127                let err: IcebergError = err.into();
2128                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2129                SinkError::Iceberg(anyhow!(err))
2130            })
2131        })
2132        .await?;
2133        self.table = table;
2134
2135        let snapshot_num = self.table.metadata().snapshots().count();
2136        let catalog_name = self.config.common.catalog_name();
2137        let table_name = self.table.identifier().to_string();
2138        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2139        GLOBAL_SINK_METRICS
2140            .iceberg_snapshot_num
2141            .with_guarded_label_values(&metrics_labels)
2142            .set(snapshot_num as i64);
2143
2144        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2145
2146        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2147            && self.config.enable_compaction
2148            && iceberg_compact_stat_sender
2149                .send(IcebergSinkCompactionUpdate {
2150                    sink_id: self.sink_id,
2151                    compaction_interval: self.config.compaction_interval_sec(),
2152                    force_compaction: false,
2153                })
2154                .is_err()
2155        {
2156            warn!("failed to send iceberg compaction stats");
2157        }
2158
2159        Ok(())
2160    }
2161
2162    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2163    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2164    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2165    async fn is_snapshot_id_in_iceberg(
2166        &self,
2167        iceberg_config: &IcebergConfig,
2168        snapshot_id: i64,
2169    ) -> Result<bool> {
2170        let table = iceberg_config.load_table().await?;
2171        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2172            Ok(true)
2173        } else {
2174            Ok(false)
2175        }
2176    }
2177
2178    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2179    /// Returns the number of snapshots since the last rewrite/overwrite
2180    fn count_snapshots_since_rewrite(&self) -> usize {
2181        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2182        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2183
2184        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2185        let mut count = 0;
2186        for snapshot in snapshots {
2187            // Check if this snapshot represents a rewrite or overwrite operation
2188            let summary = snapshot.summary();
2189            match &summary.operation {
2190                Operation::Replace => {
2191                    // Found a rewrite/overwrite operation, stop counting
2192                    break;
2193                }
2194
2195                _ => {
2196                    // Increment count for each snapshot that is not a rewrite/overwrite
2197                    count += 1;
2198                }
2199            }
2200        }
2201
2202        count
2203    }
2204
2205    /// Wait until snapshot count since last rewrite is below the limit
2206    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2207        if !self.config.enable_compaction {
2208            return Ok(());
2209        }
2210
2211        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2212            loop {
2213                let current_count = self.count_snapshots_since_rewrite();
2214
2215                if current_count < max_snapshots {
2216                    tracing::info!(
2217                        "Snapshot count check passed: {} < {}",
2218                        current_count,
2219                        max_snapshots
2220                    );
2221                    break;
2222                }
2223
2224                tracing::info!(
2225                    "Snapshot count {} exceeds limit {}, waiting...",
2226                    current_count,
2227                    max_snapshots
2228                );
2229
2230                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2231                    && iceberg_compact_stat_sender
2232                        .send(IcebergSinkCompactionUpdate {
2233                            sink_id: self.sink_id,
2234                            compaction_interval: self.config.compaction_interval_sec(),
2235                            force_compaction: true,
2236                        })
2237                        .is_err()
2238                {
2239                    tracing::warn!("failed to send iceberg compaction stats");
2240                }
2241
2242                // Wait for 30 seconds before checking again
2243                tokio::time::sleep(Duration::from_secs(30)).await;
2244
2245                // Reload table to get latest snapshots
2246                self.table = self.config.load_table().await?;
2247            }
2248        }
2249        Ok(())
2250    }
2251}
2252
2253const MAP_KEY: &str = "key";
2254const MAP_VALUE: &str = "value";
2255
2256fn get_fields<'a>(
2257    our_field_type: &'a risingwave_common::types::DataType,
2258    data_type: &ArrowDataType,
2259    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2260) -> Option<ArrowFields> {
2261    match data_type {
2262        ArrowDataType::Struct(fields) => {
2263            match our_field_type {
2264                risingwave_common::types::DataType::Struct(struct_fields) => {
2265                    struct_fields.iter().for_each(|(name, data_type)| {
2266                        let res = schema_fields.insert(name, data_type);
2267                        // This assert is to make sure there is no duplicate field name in the schema.
2268                        assert!(res.is_none())
2269                    });
2270                }
2271                risingwave_common::types::DataType::Map(map_fields) => {
2272                    schema_fields.insert(MAP_KEY, map_fields.key());
2273                    schema_fields.insert(MAP_VALUE, map_fields.value());
2274                }
2275                risingwave_common::types::DataType::List(list) => {
2276                    list.elem()
2277                        .as_struct()
2278                        .iter()
2279                        .for_each(|(name, data_type)| {
2280                            let res = schema_fields.insert(name, data_type);
2281                            // This assert is to make sure there is no duplicate field name in the schema.
2282                            assert!(res.is_none())
2283                        });
2284                }
2285                _ => {}
2286            };
2287            Some(fields.clone())
2288        }
2289        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2290            get_fields(our_field_type, field.data_type(), schema_fields)
2291        }
2292        _ => None, // not a supported complex type and unlikely to show up
2293    }
2294}
2295
2296fn check_compatibility(
2297    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2298    fields: &ArrowFields,
2299) -> anyhow::Result<bool> {
2300    for arrow_field in fields {
2301        let our_field_type = schema_fields
2302            .get(arrow_field.name().as_str())
2303            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2304
2305        // Iceberg source should be able to read iceberg decimal type.
2306        let converted_arrow_data_type = IcebergArrowConvert
2307            .to_arrow_field("", our_field_type)
2308            .map_err(|e| anyhow!(e))?
2309            .data_type()
2310            .clone();
2311
2312        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2313            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2314            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2315            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2316            (ArrowDataType::List(_), ArrowDataType::List(field))
2317            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2318                let mut schema_fields = HashMap::new();
2319                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2320                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2321            }
2322            // validate nested structs
2323            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2324                let mut schema_fields = HashMap::new();
2325                our_field_type
2326                    .as_struct()
2327                    .iter()
2328                    .for_each(|(name, data_type)| {
2329                        let res = schema_fields.insert(name, data_type);
2330                        // This assert is to make sure there is no duplicate field name in the schema.
2331                        assert!(res.is_none())
2332                    });
2333                check_compatibility(schema_fields, fields)?
2334            }
2335            // cases where left != right (metadata, field name mismatch)
2336            //
2337            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2338            // {"PARQUET:field_id": ".."}
2339            //
2340            // map: The standard name in arrow is "entries", "key", "value".
2341            // in iceberg-rs, it's called "key_value"
2342            (left, right) => left.equals_datatype(right),
2343        };
2344        if !compatible {
2345            bail!(
2346                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2347                arrow_field.name(),
2348                converted_arrow_data_type,
2349                arrow_field.data_type()
2350            );
2351        }
2352    }
2353    Ok(true)
2354}
2355
2356/// Try to match our schema with iceberg schema.
2357pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2358    if rw_schema.fields.len() != arrow_schema.fields().len() {
2359        bail!(
2360            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2361            rw_schema.fields.len(),
2362            arrow_schema.fields.len()
2363        );
2364    }
2365
2366    let mut schema_fields = HashMap::new();
2367    rw_schema.fields.iter().for_each(|field| {
2368        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2369        // This assert is to make sure there is no duplicate field name in the schema.
2370        assert!(res.is_none())
2371    });
2372
2373    check_compatibility(schema_fields, &arrow_schema.fields)?;
2374    Ok(())
2375}
2376
2377pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2378    serde_json::to_vec(&metadata).unwrap()
2379}
2380
2381pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2382    serde_json::from_slice(&bytes).unwrap()
2383}
2384
2385pub fn parse_partition_by_exprs(
2386    expr: String,
2387) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2388    // captures column, transform(column), transform(n,column), transform(n, column)
2389    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2390    if !re.is_match(&expr) {
2391        bail!(format!(
2392            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2393            expr
2394        ))
2395    }
2396    let caps = re.captures_iter(&expr);
2397
2398    let mut partition_columns = vec![];
2399
2400    for mat in caps {
2401        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2402            (&mat["transform"], Transform::Identity)
2403        } else {
2404            let mut func = mat["transform"].to_owned();
2405            if func == "bucket" || func == "truncate" {
2406                let n = &mat
2407                    .name("n")
2408                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2409                    .as_str();
2410                func = format!("{func}[{n}]");
2411            }
2412            (
2413                &mat["field"],
2414                Transform::from_str(&func)
2415                    .with_context(|| format!("invalid transform function {}", func))?,
2416            )
2417        };
2418        partition_columns.push((column.to_owned(), transform));
2419    }
2420    Ok(partition_columns)
2421}
2422
2423pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2424    if should_enable_iceberg_cow(sink_type, write_mode) {
2425        ICEBERG_COW_BRANCH.to_owned()
2426    } else {
2427        MAIN_BRANCH.to_owned()
2428    }
2429}
2430
2431pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2432    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2433}
2434
2435impl crate::with_options::WithOptions for IcebergWriteMode {}
2436
2437impl crate::with_options::WithOptions for CompactionType {}
2438
2439#[cfg(test)]
2440mod test {
2441    use std::collections::BTreeMap;
2442
2443    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2444    use risingwave_common::types::{DataType, MapType, StructType};
2445
2446    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2447    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2448    use crate::sink::iceberg::{
2449        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2450        ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2451        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2452        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2453    };
2454
2455    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2456
2457    #[test]
2458    fn test_compatible_arrow_schema() {
2459        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2460
2461        use super::*;
2462        let risingwave_schema = Schema::new(vec![
2463            Field::with_name(DataType::Int32, "a"),
2464            Field::with_name(DataType::Int32, "b"),
2465            Field::with_name(DataType::Int32, "c"),
2466        ]);
2467        let arrow_schema = ArrowSchema::new(vec![
2468            ArrowField::new("a", ArrowDataType::Int32, false),
2469            ArrowField::new("b", ArrowDataType::Int32, false),
2470            ArrowField::new("c", ArrowDataType::Int32, false),
2471        ]);
2472
2473        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2474
2475        let risingwave_schema = Schema::new(vec![
2476            Field::with_name(DataType::Int32, "d"),
2477            Field::with_name(DataType::Int32, "c"),
2478            Field::with_name(DataType::Int32, "a"),
2479            Field::with_name(DataType::Int32, "b"),
2480        ]);
2481        let arrow_schema = ArrowSchema::new(vec![
2482            ArrowField::new("a", ArrowDataType::Int32, false),
2483            ArrowField::new("b", ArrowDataType::Int32, false),
2484            ArrowField::new("d", ArrowDataType::Int32, false),
2485            ArrowField::new("c", ArrowDataType::Int32, false),
2486        ]);
2487        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2488
2489        let risingwave_schema = Schema::new(vec![
2490            Field::with_name(
2491                DataType::Struct(StructType::new(vec![
2492                    ("a1", DataType::Int32),
2493                    (
2494                        "a2",
2495                        DataType::Struct(StructType::new(vec![
2496                            ("a21", DataType::Bytea),
2497                            (
2498                                "a22",
2499                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2500                            ),
2501                        ])),
2502                    ),
2503                ])),
2504                "a",
2505            ),
2506            Field::with_name(
2507                DataType::list(DataType::Struct(StructType::new(vec![
2508                    ("b1", DataType::Int32),
2509                    ("b2", DataType::Bytea),
2510                    (
2511                        "b3",
2512                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2513                    ),
2514                ]))),
2515                "b",
2516            ),
2517            Field::with_name(
2518                DataType::Map(MapType::from_kv(
2519                    DataType::Varchar,
2520                    DataType::list(DataType::Struct(StructType::new([
2521                        ("c1", DataType::Int32),
2522                        ("c2", DataType::Bytea),
2523                        (
2524                            "c3",
2525                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2526                        ),
2527                    ]))),
2528                )),
2529                "c",
2530            ),
2531        ]);
2532        let arrow_schema = ArrowSchema::new(vec![
2533            ArrowField::new(
2534                "a",
2535                ArrowDataType::Struct(ArrowFields::from(vec![
2536                    ArrowField::new("a1", ArrowDataType::Int32, false),
2537                    ArrowField::new(
2538                        "a2",
2539                        ArrowDataType::Struct(ArrowFields::from(vec![
2540                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2541                            ArrowField::new_map(
2542                                "a22",
2543                                "entries",
2544                                ArrowFieldRef::new(ArrowField::new(
2545                                    "key",
2546                                    ArrowDataType::Utf8,
2547                                    false,
2548                                )),
2549                                ArrowFieldRef::new(ArrowField::new(
2550                                    "value",
2551                                    ArrowDataType::Utf8,
2552                                    false,
2553                                )),
2554                                false,
2555                                false,
2556                            ),
2557                        ])),
2558                        false,
2559                    ),
2560                ])),
2561                false,
2562            ),
2563            ArrowField::new(
2564                "b",
2565                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2566                    ArrowDataType::Struct(ArrowFields::from(vec![
2567                        ArrowField::new("b1", ArrowDataType::Int32, false),
2568                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2569                        ArrowField::new_map(
2570                            "b3",
2571                            "entries",
2572                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2573                            ArrowFieldRef::new(ArrowField::new(
2574                                "value",
2575                                ArrowDataType::Utf8,
2576                                false,
2577                            )),
2578                            false,
2579                            false,
2580                        ),
2581                    ])),
2582                    false,
2583                ))),
2584                false,
2585            ),
2586            ArrowField::new_map(
2587                "c",
2588                "entries",
2589                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2590                ArrowFieldRef::new(ArrowField::new(
2591                    "value",
2592                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2593                        ArrowDataType::Struct(ArrowFields::from(vec![
2594                            ArrowField::new("c1", ArrowDataType::Int32, false),
2595                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2596                            ArrowField::new_map(
2597                                "c3",
2598                                "entries",
2599                                ArrowFieldRef::new(ArrowField::new(
2600                                    "key",
2601                                    ArrowDataType::Utf8,
2602                                    false,
2603                                )),
2604                                ArrowFieldRef::new(ArrowField::new(
2605                                    "value",
2606                                    ArrowDataType::Utf8,
2607                                    false,
2608                                )),
2609                                false,
2610                                false,
2611                            ),
2612                        ])),
2613                        false,
2614                    ))),
2615                    false,
2616                )),
2617                false,
2618                false,
2619            ),
2620        ]);
2621        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2622    }
2623
2624    #[test]
2625    fn test_parse_iceberg_config() {
2626        let values = [
2627            ("connector", "iceberg"),
2628            ("type", "upsert"),
2629            ("primary_key", "v1"),
2630            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2631            ("warehouse.path", "s3://iceberg"),
2632            ("s3.endpoint", "http://127.0.0.1:9301"),
2633            ("s3.access.key", "hummockadmin"),
2634            ("s3.secret.key", "hummockadmin"),
2635            ("s3.path.style.access", "true"),
2636            ("s3.region", "us-east-1"),
2637            ("catalog.type", "jdbc"),
2638            ("catalog.name", "demo"),
2639            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2640            ("catalog.jdbc.user", "admin"),
2641            ("catalog.jdbc.password", "123456"),
2642            ("database.name", "demo_db"),
2643            ("table.name", "demo_table"),
2644            ("enable_compaction", "true"),
2645            ("compaction_interval_sec", "1800"),
2646            ("enable_snapshot_expiration", "true"),
2647        ]
2648        .into_iter()
2649        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2650        .collect();
2651
2652        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2653
2654        let expected_iceberg_config = IcebergConfig {
2655            common: IcebergCommon {
2656                warehouse_path: Some("s3://iceberg".to_owned()),
2657                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2658                s3_region: Some("us-east-1".to_owned()),
2659                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2660                s3_access_key: Some("hummockadmin".to_owned()),
2661                s3_secret_key: Some("hummockadmin".to_owned()),
2662                s3_iam_role_arn: None,
2663                gcs_credential: None,
2664                catalog_type: Some("jdbc".to_owned()),
2665                glue_id: None,
2666                glue_region: None,
2667                glue_access_key: None,
2668                glue_secret_key: None,
2669                glue_iam_role_arn: None,
2670                catalog_name: Some("demo".to_owned()),
2671                s3_path_style_access: Some(true),
2672                catalog_credential: None,
2673                catalog_oauth2_server_uri: None,
2674                catalog_scope: None,
2675                catalog_token: None,
2676                enable_config_load: None,
2677                rest_signing_name: None,
2678                rest_signing_region: None,
2679                rest_sigv4_enabled: None,
2680                hosted_catalog: None,
2681                azblob_account_name: None,
2682                azblob_account_key: None,
2683                azblob_endpoint_url: None,
2684                catalog_header: None,
2685                adlsgen2_account_name: None,
2686                adlsgen2_account_key: None,
2687                adlsgen2_endpoint: None,
2688                vended_credentials: None,
2689            },
2690            table: IcebergTableIdentifier {
2691                database_name: Some("demo_db".to_owned()),
2692                table_name: "demo_table".to_owned(),
2693            },
2694            r#type: "upsert".to_owned(),
2695            force_append_only: false,
2696            primary_key: Some(vec!["v1".to_owned()]),
2697            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2698            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2699                .into_iter()
2700                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2701                .collect(),
2702            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2703            create_table_if_not_exists: false,
2704            is_exactly_once: Some(true),
2705            commit_retry_num: 8,
2706            enable_compaction: true,
2707            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2708            enable_snapshot_expiration: true,
2709            write_mode: IcebergWriteMode::MergeOnRead,
2710            snapshot_expiration_max_age_millis: None,
2711            snapshot_expiration_retain_last: None,
2712            snapshot_expiration_clear_expired_files: true,
2713            snapshot_expiration_clear_expired_meta_data: true,
2714            max_snapshots_num_before_compaction: None,
2715            small_files_threshold_mb: None,
2716            delete_files_count_threshold: None,
2717            trigger_snapshot_count: None,
2718            target_file_size_mb: None,
2719            compaction_type: None,
2720        };
2721
2722        assert_eq!(iceberg_config, expected_iceberg_config);
2723
2724        assert_eq!(
2725            &iceberg_config.full_table_name().unwrap().to_string(),
2726            "demo_db.demo_table"
2727        );
2728    }
2729
2730    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2731        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2732
2733        let _table = iceberg_config.load_table().await.unwrap();
2734    }
2735
2736    #[tokio::test]
2737    #[ignore]
2738    async fn test_storage_catalog() {
2739        let values = [
2740            ("connector", "iceberg"),
2741            ("type", "append-only"),
2742            ("force_append_only", "true"),
2743            ("s3.endpoint", "http://127.0.0.1:9301"),
2744            ("s3.access.key", "hummockadmin"),
2745            ("s3.secret.key", "hummockadmin"),
2746            ("s3.region", "us-east-1"),
2747            ("s3.path.style.access", "true"),
2748            ("catalog.name", "demo"),
2749            ("catalog.type", "storage"),
2750            ("warehouse.path", "s3://icebergdata/demo"),
2751            ("database.name", "s1"),
2752            ("table.name", "t1"),
2753        ]
2754        .into_iter()
2755        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2756        .collect();
2757
2758        test_create_catalog(values).await;
2759    }
2760
2761    #[tokio::test]
2762    #[ignore]
2763    async fn test_rest_catalog() {
2764        let values = [
2765            ("connector", "iceberg"),
2766            ("type", "append-only"),
2767            ("force_append_only", "true"),
2768            ("s3.endpoint", "http://127.0.0.1:9301"),
2769            ("s3.access.key", "hummockadmin"),
2770            ("s3.secret.key", "hummockadmin"),
2771            ("s3.region", "us-east-1"),
2772            ("s3.path.style.access", "true"),
2773            ("catalog.name", "demo"),
2774            ("catalog.type", "rest"),
2775            ("catalog.uri", "http://192.168.167.4:8181"),
2776            ("warehouse.path", "s3://icebergdata/demo"),
2777            ("database.name", "s1"),
2778            ("table.name", "t1"),
2779        ]
2780        .into_iter()
2781        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2782        .collect();
2783
2784        test_create_catalog(values).await;
2785    }
2786
2787    #[tokio::test]
2788    #[ignore]
2789    async fn test_jdbc_catalog() {
2790        let values = [
2791            ("connector", "iceberg"),
2792            ("type", "append-only"),
2793            ("force_append_only", "true"),
2794            ("s3.endpoint", "http://127.0.0.1:9301"),
2795            ("s3.access.key", "hummockadmin"),
2796            ("s3.secret.key", "hummockadmin"),
2797            ("s3.region", "us-east-1"),
2798            ("s3.path.style.access", "true"),
2799            ("catalog.name", "demo"),
2800            ("catalog.type", "jdbc"),
2801            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2802            ("catalog.jdbc.user", "admin"),
2803            ("catalog.jdbc.password", "123456"),
2804            ("warehouse.path", "s3://icebergdata/demo"),
2805            ("database.name", "s1"),
2806            ("table.name", "t1"),
2807        ]
2808        .into_iter()
2809        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2810        .collect();
2811
2812        test_create_catalog(values).await;
2813    }
2814
2815    #[tokio::test]
2816    #[ignore]
2817    async fn test_hive_catalog() {
2818        let values = [
2819            ("connector", "iceberg"),
2820            ("type", "append-only"),
2821            ("force_append_only", "true"),
2822            ("s3.endpoint", "http://127.0.0.1:9301"),
2823            ("s3.access.key", "hummockadmin"),
2824            ("s3.secret.key", "hummockadmin"),
2825            ("s3.region", "us-east-1"),
2826            ("s3.path.style.access", "true"),
2827            ("catalog.name", "demo"),
2828            ("catalog.type", "hive"),
2829            ("catalog.uri", "thrift://localhost:9083"),
2830            ("warehouse.path", "s3://icebergdata/demo"),
2831            ("database.name", "s1"),
2832            ("table.name", "t1"),
2833        ]
2834        .into_iter()
2835        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2836        .collect();
2837
2838        test_create_catalog(values).await;
2839    }
2840
2841    #[test]
2842    fn test_config_constants_consistency() {
2843        // This test ensures our constants match the expected configuration names
2844        // If you change a constant, this test will remind you to update both places
2845        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2846        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2847        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2848        assert_eq!(WRITE_MODE, "write_mode");
2849        assert_eq!(
2850            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2851            "snapshot_expiration_retain_last"
2852        );
2853        assert_eq!(
2854            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2855            "snapshot_expiration_max_age_millis"
2856        );
2857        assert_eq!(
2858            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2859            "snapshot_expiration_clear_expired_files"
2860        );
2861        assert_eq!(
2862            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2863            "snapshot_expiration_clear_expired_meta_data"
2864        );
2865        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2866    }
2867
2868    #[test]
2869    fn test_parse_iceberg_compaction_config() {
2870        // Test parsing with new compaction.* prefix config names
2871        let values = [
2872            ("connector", "iceberg"),
2873            ("type", "upsert"),
2874            ("primary_key", "id"),
2875            ("warehouse.path", "s3://iceberg"),
2876            ("s3.endpoint", "http://127.0.0.1:9301"),
2877            ("s3.access.key", "test"),
2878            ("s3.secret.key", "test"),
2879            ("s3.region", "us-east-1"),
2880            ("catalog.type", "storage"),
2881            ("catalog.name", "demo"),
2882            ("database.name", "test_db"),
2883            ("table.name", "test_table"),
2884            ("enable_compaction", "true"),
2885            ("compaction.max_snapshots_num", "100"),
2886            ("compaction.small_files_threshold_mb", "512"),
2887            ("compaction.delete_files_count_threshold", "50"),
2888            ("compaction.trigger_snapshot_count", "10"),
2889            ("compaction.target_file_size_mb", "256"),
2890            ("compaction.type", "full"),
2891        ]
2892        .into_iter()
2893        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2894        .collect();
2895
2896        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2897
2898        // Verify all compaction config fields are parsed correctly
2899        assert!(iceberg_config.enable_compaction);
2900        assert_eq!(
2901            iceberg_config.max_snapshots_num_before_compaction,
2902            Some(100)
2903        );
2904        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2905        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2906        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2907        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2908        assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
2909    }
2910}