risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31    SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40    POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45    DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use risingwave_pb::stream_plan::PbSinkSchemaChange;
72use serde::{Deserialize, Serialize};
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::RetryIf;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{
95    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
96    TwoPhaseCommitCoordinator,
97};
98use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
99
100pub const ICEBERG_SINK: &str = "iceberg";
101
102pub const ICEBERG_COW_BRANCH: &str = "ingestion";
103pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
104pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
105pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
106pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
107pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
108
109pub const PARTITION_DATA_ID_START: i32 = 1000;
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
112#[serde(rename_all = "kebab-case")]
113pub enum IcebergWriteMode {
114    #[default]
115    MergeOnRead,
116    CopyOnWrite,
117}
118
119impl IcebergWriteMode {
120    pub fn as_str(self) -> &'static str {
121        match self {
122            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
123            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
124        }
125    }
126}
127
128impl std::str::FromStr for IcebergWriteMode {
129    type Err = SinkError;
130
131    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
132        match s {
133            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
134            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
135            _ => Err(SinkError::Config(anyhow!(format!(
136                "invalid write_mode: {}, must be one of: {}, {}",
137                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
138            )))),
139        }
140    }
141}
142
143impl TryFrom<&str> for IcebergWriteMode {
144    type Error = <Self as std::str::FromStr>::Err;
145
146    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
147        value.parse()
148    }
149}
150
151impl TryFrom<String> for IcebergWriteMode {
152    type Error = <Self as std::str::FromStr>::Err;
153
154    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
155        value.as_str().parse()
156    }
157}
158
159impl std::fmt::Display for IcebergWriteMode {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.write_str(self.as_str())
162    }
163}
164
165// Configuration constants
166pub const ENABLE_COMPACTION: &str = "enable_compaction";
167pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
168pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
169pub const WRITE_MODE: &str = "write_mode";
170pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
171pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
172pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
173pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
174    "snapshot_expiration_clear_expired_meta_data";
175pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
176
177pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
178
179pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
180
181pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
182
183pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
184
185pub const COMPACTION_TYPE: &str = "compaction.type";
186
187fn default_commit_retry_num() -> u32 {
188    8
189}
190
191fn default_iceberg_write_mode() -> IcebergWriteMode {
192    IcebergWriteMode::MergeOnRead
193}
194
195fn default_true() -> bool {
196    true
197}
198
199fn default_some_true() -> Option<bool> {
200    Some(true)
201}
202
203/// Compaction type for Iceberg sink
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
205#[serde(rename_all = "kebab-case")]
206pub enum CompactionType {
207    /// Full compaction - rewrites all data files
208    #[default]
209    Full,
210    /// Small files compaction - only compact small files
211    SmallFiles,
212    /// Files with delete compaction - only compact files that have associated delete files
213    FilesWithDelete,
214}
215
216impl CompactionType {
217    pub fn as_str(&self) -> &'static str {
218        match self {
219            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
220            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
221            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
222        }
223    }
224}
225
226impl std::str::FromStr for CompactionType {
227    type Err = SinkError;
228
229    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
230        match s {
231            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
232            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
233            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
234            _ => Err(SinkError::Config(anyhow!(format!(
235                "invalid compaction_type: {}, must be one of: {}, {}, {}",
236                s,
237                ICEBERG_COMPACTION_TYPE_FULL,
238                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
239                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
240            )))),
241        }
242    }
243}
244
245impl TryFrom<&str> for CompactionType {
246    type Error = <Self as std::str::FromStr>::Err;
247
248    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
249        value.parse()
250    }
251}
252
253impl TryFrom<String> for CompactionType {
254    type Error = <Self as std::str::FromStr>::Err;
255
256    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
257        value.as_str().parse()
258    }
259}
260
261impl std::fmt::Display for CompactionType {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        write!(f, "{}", self.as_str())
264    }
265}
266
267#[serde_as]
268#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
269pub struct IcebergConfig {
270    pub r#type: String, // accept "append-only" or "upsert"
271
272    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
273    pub force_append_only: bool,
274
275    #[serde(flatten)]
276    common: IcebergCommon,
277
278    #[serde(flatten)]
279    table: IcebergTableIdentifier,
280
281    #[serde(
282        rename = "primary_key",
283        default,
284        deserialize_with = "deserialize_optional_string_seq_from_string"
285    )]
286    pub primary_key: Option<Vec<String>>,
287
288    // Props for java catalog props.
289    #[serde(skip)]
290    pub java_catalog_props: HashMap<String, String>,
291
292    #[serde(default)]
293    pub partition_by: Option<String>,
294
295    /// Commit every n(>0) checkpoints, default is 60.
296    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
297    #[serde_as(as = "DisplayFromStr")]
298    #[with_option(allow_alter_on_fly)]
299    pub commit_checkpoint_interval: u64,
300
301    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
302    pub create_table_if_not_exists: bool,
303
304    /// Whether it is `exactly_once`, the default is true.
305    #[serde(default = "default_some_true")]
306    #[serde_as(as = "Option<DisplayFromStr>")]
307    pub is_exactly_once: Option<bool>,
308    // Retry commit num when iceberg commit fail. default is 8.
309    // # TODO
310    // Iceberg table may store the retry commit num in table meta.
311    // We should try to find and use that as default commit retry num first.
312    #[serde(default = "default_commit_retry_num")]
313    pub commit_retry_num: u32,
314
315    /// Whether to enable iceberg compaction.
316    #[serde(
317        rename = "enable_compaction",
318        default,
319        deserialize_with = "deserialize_bool_from_string"
320    )]
321    #[with_option(allow_alter_on_fly)]
322    pub enable_compaction: bool,
323
324    /// The interval of iceberg compaction
325    #[serde(rename = "compaction_interval_sec", default)]
326    #[serde_as(as = "Option<DisplayFromStr>")]
327    #[with_option(allow_alter_on_fly)]
328    pub compaction_interval_sec: Option<u64>,
329
330    /// Whether to enable iceberg expired snapshots.
331    #[serde(
332        rename = "enable_snapshot_expiration",
333        default,
334        deserialize_with = "deserialize_bool_from_string"
335    )]
336    #[with_option(allow_alter_on_fly)]
337    pub enable_snapshot_expiration: bool,
338
339    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
340    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
341    pub write_mode: IcebergWriteMode,
342
343    /// The maximum age (in milliseconds) for snapshots before they expire
344    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
345    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
346    #[serde_as(as = "Option<DisplayFromStr>")]
347    #[with_option(allow_alter_on_fly)]
348    pub snapshot_expiration_max_age_millis: Option<i64>,
349
350    /// The number of snapshots to retain
351    #[serde(rename = "snapshot_expiration_retain_last", default)]
352    #[serde_as(as = "Option<DisplayFromStr>")]
353    #[with_option(allow_alter_on_fly)]
354    pub snapshot_expiration_retain_last: Option<i32>,
355
356    #[serde(
357        rename = "snapshot_expiration_clear_expired_files",
358        default = "default_true",
359        deserialize_with = "deserialize_bool_from_string"
360    )]
361    #[with_option(allow_alter_on_fly)]
362    pub snapshot_expiration_clear_expired_files: bool,
363
364    #[serde(
365        rename = "snapshot_expiration_clear_expired_meta_data",
366        default = "default_true",
367        deserialize_with = "deserialize_bool_from_string"
368    )]
369    #[with_option(allow_alter_on_fly)]
370    pub snapshot_expiration_clear_expired_meta_data: bool,
371
372    /// The maximum number of snapshots allowed since the last rewrite operation
373    /// If set, sink will check snapshot count and wait if exceeded
374    #[serde(rename = "compaction.max_snapshots_num", default)]
375    #[serde_as(as = "Option<DisplayFromStr>")]
376    #[with_option(allow_alter_on_fly)]
377    pub max_snapshots_num_before_compaction: Option<usize>,
378
379    #[serde(rename = "compaction.small_files_threshold_mb", default)]
380    #[serde_as(as = "Option<DisplayFromStr>")]
381    #[with_option(allow_alter_on_fly)]
382    pub small_files_threshold_mb: Option<u64>,
383
384    #[serde(rename = "compaction.delete_files_count_threshold", default)]
385    #[serde_as(as = "Option<DisplayFromStr>")]
386    #[with_option(allow_alter_on_fly)]
387    pub delete_files_count_threshold: Option<usize>,
388
389    #[serde(rename = "compaction.trigger_snapshot_count", default)]
390    #[serde_as(as = "Option<DisplayFromStr>")]
391    #[with_option(allow_alter_on_fly)]
392    pub trigger_snapshot_count: Option<usize>,
393
394    #[serde(rename = "compaction.target_file_size_mb", default)]
395    #[serde_as(as = "Option<DisplayFromStr>")]
396    #[with_option(allow_alter_on_fly)]
397    pub target_file_size_mb: Option<u64>,
398
399    /// Compaction type: `full`, `small-files`, or `files-with-delete`
400    /// If not set, will default to `full`
401    #[serde(rename = "compaction.type", default)]
402    #[with_option(allow_alter_on_fly)]
403    pub compaction_type: Option<CompactionType>,
404}
405
406impl EnforceSecret for IcebergConfig {
407    fn enforce_secret<'a>(
408        prop_iter: impl Iterator<Item = &'a str>,
409    ) -> crate::error::ConnectorResult<()> {
410        for prop in prop_iter {
411            IcebergCommon::enforce_one(prop)?;
412        }
413        Ok(())
414    }
415
416    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
417        IcebergCommon::enforce_one(prop)
418    }
419}
420
421impl IcebergConfig {
422    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
423        let mut config =
424            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
425                .map_err(|e| SinkError::Config(anyhow!(e)))?;
426
427        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
428            return Err(SinkError::Config(anyhow!(
429                "`{}` must be {}, or {}",
430                SINK_TYPE_OPTION,
431                SINK_TYPE_APPEND_ONLY,
432                SINK_TYPE_UPSERT
433            )));
434        }
435
436        if config.r#type == SINK_TYPE_UPSERT {
437            if let Some(primary_key) = &config.primary_key {
438                if primary_key.is_empty() {
439                    return Err(SinkError::Config(anyhow!(
440                        "`primary-key` must not be empty in {}",
441                        SINK_TYPE_UPSERT
442                    )));
443                }
444            } else {
445                return Err(SinkError::Config(anyhow!(
446                    "Must set `primary-key` in {}",
447                    SINK_TYPE_UPSERT
448                )));
449            }
450        }
451
452        // All configs start with "catalog." will be treated as java configs.
453        config.java_catalog_props = values
454            .iter()
455            .filter(|(k, _v)| {
456                k.starts_with("catalog.")
457                    && k != &"catalog.uri"
458                    && k != &"catalog.type"
459                    && k != &"catalog.name"
460                    && k != &"catalog.header"
461            })
462            .map(|(k, v)| (k[8..].to_string(), v.clone()))
463            .collect();
464
465        if config.commit_checkpoint_interval == 0 {
466            return Err(SinkError::Config(anyhow!(
467                "`commit-checkpoint-interval` must be greater than 0"
468            )));
469        }
470
471        Ok(config)
472    }
473
474    pub fn catalog_type(&self) -> &str {
475        self.common.catalog_type()
476    }
477
478    pub async fn load_table(&self) -> Result<Table> {
479        self.common
480            .load_table(&self.table, &self.java_catalog_props)
481            .await
482            .map_err(Into::into)
483    }
484
485    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
486        self.common
487            .create_catalog(&self.java_catalog_props)
488            .await
489            .map_err(Into::into)
490    }
491
492    pub fn full_table_name(&self) -> Result<TableIdent> {
493        self.table.to_table_ident().map_err(Into::into)
494    }
495
496    pub fn catalog_name(&self) -> String {
497        self.common.catalog_name()
498    }
499
500    pub fn compaction_interval_sec(&self) -> u64 {
501        // default to 1 hour
502        self.compaction_interval_sec.unwrap_or(3600)
503    }
504
505    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
506    /// Returns `current_time_ms` - `max_age_millis`
507    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
508        self.snapshot_expiration_max_age_millis
509            .map(|max_age_millis| current_time_ms - max_age_millis)
510    }
511
512    pub fn trigger_snapshot_count(&self) -> usize {
513        self.trigger_snapshot_count.unwrap_or(16)
514    }
515
516    pub fn small_files_threshold_mb(&self) -> u64 {
517        self.small_files_threshold_mb.unwrap_or(64)
518    }
519
520    pub fn delete_files_count_threshold(&self) -> usize {
521        self.delete_files_count_threshold.unwrap_or(256)
522    }
523
524    pub fn target_file_size_mb(&self) -> u64 {
525        self.target_file_size_mb.unwrap_or(1024)
526    }
527
528    /// Get the compaction type as an enum
529    /// This method parses the string and returns the enum value
530    pub fn compaction_type(&self) -> CompactionType {
531        self.compaction_type.unwrap_or_default()
532    }
533}
534
535pub struct IcebergSink {
536    pub config: IcebergConfig,
537    param: SinkParam,
538    // In upsert mode, it never be None and empty.
539    unique_column_ids: Option<Vec<usize>>,
540}
541
542impl EnforceSecret for IcebergSink {
543    fn enforce_secret<'a>(
544        prop_iter: impl Iterator<Item = &'a str>,
545    ) -> crate::error::ConnectorResult<()> {
546        for prop in prop_iter {
547            IcebergConfig::enforce_one(prop)?;
548        }
549        Ok(())
550    }
551}
552
553impl TryFrom<SinkParam> for IcebergSink {
554    type Error = SinkError;
555
556    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
557        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
558        IcebergSink::new(config, param)
559    }
560}
561
562impl Debug for IcebergSink {
563    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564        f.debug_struct("IcebergSink")
565            .field("config", &self.config)
566            .finish()
567    }
568}
569
570async fn create_and_validate_table_impl(
571    config: &IcebergConfig,
572    param: &SinkParam,
573) -> Result<Table> {
574    if config.create_table_if_not_exists {
575        create_table_if_not_exists_impl(config, param).await?;
576    }
577
578    let table = config
579        .load_table()
580        .await
581        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
582
583    let sink_schema = param.schema();
584    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
585        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
586
587    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
588        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
589
590    Ok(table)
591}
592
593async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
594    let catalog = config.create_catalog().await?;
595    let namespace = if let Some(database_name) = config.table.database_name() {
596        let namespace = NamespaceIdent::new(database_name.to_owned());
597        if !catalog
598            .namespace_exists(&namespace)
599            .await
600            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
601        {
602            catalog
603                .create_namespace(&namespace, HashMap::default())
604                .await
605                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
606                .context("failed to create iceberg namespace")?;
607        }
608        namespace
609    } else {
610        bail!("database name must be set if you want to create table")
611    };
612
613    let table_id = config
614        .full_table_name()
615        .context("Unable to parse table name")?;
616    if !catalog
617        .table_exists(&table_id)
618        .await
619        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
620    {
621        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
622        // convert risingwave schema -> arrow schema -> iceberg schema
623        let arrow_fields = param
624            .columns
625            .iter()
626            .map(|column| {
627                Ok(iceberg_create_table_arrow_convert
628                    .to_arrow_field(&column.name, &column.data_type)
629                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
630                    .context(format!(
631                        "failed to convert {}: {} to arrow type",
632                        &column.name, &column.data_type
633                    ))?)
634            })
635            .collect::<Result<Vec<ArrowField>>>()?;
636        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
637        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
638            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
639            .context("failed to convert arrow schema to iceberg schema")?;
640
641        let location = {
642            let mut names = namespace.clone().inner();
643            names.push(config.table.table_name().to_owned());
644            match &config.common.warehouse_path {
645                Some(warehouse_path) => {
646                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
647                    let url = Url::parse(warehouse_path);
648                    if url.is_err() || is_s3_tables {
649                        // For rest catalog, the warehouse_path could be a warehouse name.
650                        // In this case, we should specify the location when creating a table.
651                        if config.common.catalog_type() == "rest"
652                            || config.common.catalog_type() == "rest_rust"
653                        {
654                            None
655                        } else {
656                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
657                        }
658                    } else if warehouse_path.ends_with('/') {
659                        Some(format!("{}{}", warehouse_path, names.join("/")))
660                    } else {
661                        Some(format!("{}/{}", warehouse_path, names.join("/")))
662                    }
663                }
664                None => None,
665            }
666        };
667
668        let partition_spec = match &config.partition_by {
669            Some(partition_by) => {
670                let mut partition_fields = Vec::<UnboundPartitionField>::new();
671                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
672                    .into_iter()
673                    .enumerate()
674                {
675                    match iceberg_schema.field_id_by_name(&column) {
676                        Some(id) => partition_fields.push(
677                            UnboundPartitionField::builder()
678                                .source_id(id)
679                                .transform(transform)
680                                .name(format!("_p_{}", column))
681                                .field_id(PARTITION_DATA_ID_START + i as i32)
682                                .build(),
683                        ),
684                        None => bail!(format!(
685                            "Partition source column does not exist in schema: {}",
686                            column
687                        )),
688                    };
689                }
690                Some(
691                    UnboundPartitionSpec::builder()
692                        .with_spec_id(0)
693                        .add_partition_fields(partition_fields)
694                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
695                        .context("failed to add partition columns")?
696                        .build(),
697                )
698            }
699            None => None,
700        };
701
702        let table_creation_builder = TableCreation::builder()
703            .name(config.table.table_name().to_owned())
704            .schema(iceberg_schema);
705
706        let table_creation = match (location, partition_spec) {
707            (Some(location), Some(partition_spec)) => table_creation_builder
708                .location(location)
709                .partition_spec(partition_spec)
710                .build(),
711            (Some(location), None) => table_creation_builder.location(location).build(),
712            (None, Some(partition_spec)) => table_creation_builder
713                .partition_spec(partition_spec)
714                .build(),
715            (None, None) => table_creation_builder.build(),
716        };
717
718        catalog
719            .create_table(&namespace, table_creation)
720            .await
721            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
722            .context("failed to create iceberg table")?;
723    }
724    Ok(())
725}
726
727impl IcebergSink {
728    pub async fn create_and_validate_table(&self) -> Result<Table> {
729        create_and_validate_table_impl(&self.config, &self.param).await
730    }
731
732    pub async fn create_table_if_not_exists(&self) -> Result<()> {
733        create_table_if_not_exists_impl(&self.config, &self.param).await
734    }
735
736    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
737        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
738            if let Some(pk) = &config.primary_key {
739                let mut unique_column_ids = Vec::with_capacity(pk.len());
740                for col_name in pk {
741                    let id = param
742                        .columns
743                        .iter()
744                        .find(|col| col.name.as_str() == col_name)
745                        .ok_or_else(|| {
746                            SinkError::Config(anyhow!(
747                                "Primary key column {} not found in sink schema",
748                                col_name
749                            ))
750                        })?
751                        .column_id
752                        .get_id() as usize;
753                    unique_column_ids.push(id);
754                }
755                Some(unique_column_ids)
756            } else {
757                unreachable!()
758            }
759        } else {
760            None
761        };
762        Ok(Self {
763            config,
764            param,
765            unique_column_ids,
766        })
767    }
768}
769
770impl Sink for IcebergSink {
771    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
772
773    const SINK_NAME: &'static str = ICEBERG_SINK;
774
775    async fn validate(&self) -> Result<()> {
776        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
777            bail!("Snowflake catalog only supports iceberg sources");
778        }
779
780        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
781            risingwave_common::license::Feature::IcebergSinkWithGlue
782                .check_available()
783                .map_err(|e| anyhow::anyhow!(e))?;
784        }
785
786        // Validate compaction type configuration
787        let compaction_type = self.config.compaction_type();
788
789        // Check COW mode constraints
790        // COW mode only supports 'full' compaction type
791        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
792            && compaction_type != CompactionType::Full
793        {
794            bail!(
795                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
796                compaction_type
797            );
798        }
799
800        match compaction_type {
801            CompactionType::SmallFiles => {
802                // 1. check license
803                risingwave_common::license::Feature::IcebergCompaction
804                    .check_available()
805                    .map_err(|e| anyhow::anyhow!(e))?;
806
807                // 2. check write mode
808                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
809                    bail!(
810                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
811                        self.config.write_mode
812                    );
813                }
814
815                // 3. check conflicting parameters
816                if self.config.delete_files_count_threshold.is_some() {
817                    bail!(
818                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
819                    );
820                }
821            }
822            CompactionType::FilesWithDelete => {
823                // 1. check license
824                risingwave_common::license::Feature::IcebergCompaction
825                    .check_available()
826                    .map_err(|e| anyhow::anyhow!(e))?;
827
828                // 2. check write mode
829                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
830                    bail!(
831                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
832                        self.config.write_mode
833                    );
834                }
835
836                // 3. check conflicting parameters
837                if self.config.small_files_threshold_mb.is_some() {
838                    bail!(
839                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
840                    );
841                }
842            }
843            CompactionType::Full => {
844                // Full compaction has no special requirements
845            }
846        }
847
848        let _ = self.create_and_validate_table().await?;
849        Ok(())
850    }
851
852    fn support_schema_change() -> bool {
853        true
854    }
855
856    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
857        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
858
859        // Validate compaction interval
860        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
861            if iceberg_config.enable_compaction && compaction_interval == 0 {
862                bail!(
863                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
864                );
865            }
866
867            // log compaction_interval
868            tracing::info!(
869                "Alter config compaction_interval set to {} seconds",
870                compaction_interval
871            );
872        }
873
874        // Validate max snapshots
875        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
876            && max_snapshots < 1
877        {
878            bail!(
879                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
880                max_snapshots
881            );
882        }
883
884        Ok(())
885    }
886
887    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
888        let writer = IcebergSinkWriter::new(
889            self.config.clone(),
890            self.param.clone(),
891            writer_param.clone(),
892            self.unique_column_ids.clone(),
893        );
894
895        let commit_checkpoint_interval =
896            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
897                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
898            );
899        let log_sinker = CoordinatedLogSinker::new(
900            &writer_param,
901            self.param.clone(),
902            writer,
903            commit_checkpoint_interval,
904        )
905        .await?;
906
907        Ok(log_sinker)
908    }
909
910    fn is_coordinated_sink(&self) -> bool {
911        true
912    }
913
914    async fn new_coordinator(
915        &self,
916        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
917    ) -> Result<SinkCommitCoordinator> {
918        let catalog = self.config.create_catalog().await?;
919        let table = self.create_and_validate_table().await?;
920        let coordinator = IcebergSinkCommitter {
921            catalog,
922            table,
923            last_commit_epoch: 0,
924            sink_id: self.param.sink_id,
925            config: self.config.clone(),
926            param: self.param.clone(),
927            commit_retry_num: self.config.commit_retry_num,
928            iceberg_compact_stat_sender,
929        };
930        if self.config.is_exactly_once.unwrap_or_default() {
931            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
932        } else {
933            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
934        }
935    }
936}
937
938/// None means no project.
939/// Prepare represent the extra partition column idx.
940/// Done represents the project idx vec.
941///
942/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
943/// to create the project idx vec.
944enum ProjectIdxVec {
945    None,
946    Prepare(usize),
947    Done(Vec<usize>),
948}
949
950type DataFileWriterBuilderType =
951    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
952type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
953    ParquetWriterBuilder,
954    DefaultLocationGenerator,
955    DefaultFileNameGenerator,
956>;
957type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
958    ParquetWriterBuilder,
959    DefaultLocationGenerator,
960    DefaultFileNameGenerator,
961>;
962
963#[derive(Clone)]
964struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
965    inner: B,
966    fanout_enabled: bool,
967    schema: IcebergSchemaRef,
968    partition_spec: PartitionSpecRef,
969    compute_partition: bool,
970}
971
972impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
973    fn new(
974        inner: B,
975        fanout_enabled: bool,
976        schema: IcebergSchemaRef,
977        partition_spec: PartitionSpecRef,
978        compute_partition: bool,
979    ) -> Self {
980        Self {
981            inner,
982            fanout_enabled,
983            schema,
984            partition_spec,
985            compute_partition,
986        }
987    }
988
989    fn build(self) -> iceberg::Result<TaskWriter<B>> {
990        let partition_splitter = match (
991            self.partition_spec.is_unpartitioned(),
992            self.compute_partition,
993        ) {
994            (true, _) => None,
995            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
996                self.schema.clone(),
997                self.partition_spec.clone(),
998            )?),
999            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1000                self.schema.clone(),
1001                self.partition_spec.clone(),
1002            )?),
1003        };
1004
1005        Ok(TaskWriter::new_with_partition_splitter(
1006            self.inner,
1007            self.fanout_enabled,
1008            self.schema,
1009            self.partition_spec,
1010            partition_splitter,
1011        ))
1012    }
1013}
1014
1015pub enum IcebergSinkWriter {
1016    Created(IcebergSinkWriterArgs),
1017    Initialized(IcebergSinkWriterInner),
1018}
1019
1020pub struct IcebergSinkWriterArgs {
1021    config: IcebergConfig,
1022    sink_param: SinkParam,
1023    writer_param: SinkWriterParam,
1024    unique_column_ids: Option<Vec<usize>>,
1025}
1026
1027pub struct IcebergSinkWriterInner {
1028    writer: IcebergWriterDispatch,
1029    arrow_schema: SchemaRef,
1030    // See comments below
1031    metrics: IcebergWriterMetrics,
1032    // State of iceberg table for this writer
1033    table: Table,
1034    // For chunk with extra partition column, we should remove this column before write.
1035    // This project index vec is used to avoid create project idx each time.
1036    project_idx_vec: ProjectIdxVec,
1037}
1038
1039#[allow(clippy::type_complexity)]
1040enum IcebergWriterDispatch {
1041    Append {
1042        writer: Option<Box<dyn IcebergWriter>>,
1043        writer_builder:
1044            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1045    },
1046    Upsert {
1047        writer: Option<Box<dyn IcebergWriter>>,
1048        writer_builder: TaskWriterBuilderWrapper<
1049            MonitoredGeneralWriterBuilder<
1050                DeltaWriterBuilder<
1051                    DataFileWriterBuilderType,
1052                    PositionDeleteFileWriterBuilderType,
1053                    EqualityDeleteFileWriterBuilderType,
1054                >,
1055            >,
1056        >,
1057        arrow_schema_with_op_column: SchemaRef,
1058    },
1059}
1060
1061impl IcebergWriterDispatch {
1062    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1063        match self {
1064            IcebergWriterDispatch::Append { writer, .. }
1065            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1066        }
1067    }
1068}
1069
1070pub struct IcebergWriterMetrics {
1071    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1072    // They are actually used in `PrometheusWriterBuilder`:
1073    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1074    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1075    _write_qps: LabelGuardedIntCounter,
1076    _write_latency: LabelGuardedHistogram,
1077    write_bytes: LabelGuardedIntCounter,
1078}
1079
1080impl IcebergSinkWriter {
1081    pub fn new(
1082        config: IcebergConfig,
1083        sink_param: SinkParam,
1084        writer_param: SinkWriterParam,
1085        unique_column_ids: Option<Vec<usize>>,
1086    ) -> Self {
1087        Self::Created(IcebergSinkWriterArgs {
1088            config,
1089            sink_param,
1090            writer_param,
1091            unique_column_ids,
1092        })
1093    }
1094}
1095
1096impl IcebergSinkWriterInner {
1097    fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1098        let SinkWriterParam {
1099            extra_partition_col_idx,
1100            actor_id,
1101            sink_id,
1102            sink_name,
1103            ..
1104        } = writer_param;
1105        let metrics_labels = [
1106            &actor_id.to_string(),
1107            &sink_id.to_string(),
1108            sink_name.as_str(),
1109        ];
1110
1111        // Metrics
1112        let write_qps = GLOBAL_SINK_METRICS
1113            .iceberg_write_qps
1114            .with_guarded_label_values(&metrics_labels);
1115        let write_latency = GLOBAL_SINK_METRICS
1116            .iceberg_write_latency
1117            .with_guarded_label_values(&metrics_labels);
1118        // # TODO
1119        // Unused. Add this metrics later.
1120        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1121            .iceberg_rolling_unflushed_data_file
1122            .with_guarded_label_values(&metrics_labels);
1123        let write_bytes = GLOBAL_SINK_METRICS
1124            .iceberg_write_bytes
1125            .with_guarded_label_values(&metrics_labels);
1126
1127        let schema = table.metadata().current_schema();
1128        let partition_spec = table.metadata().default_partition_spec();
1129        let fanout_enabled = !partition_spec.fields().is_empty();
1130
1131        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1132        let unique_uuid_suffix = Uuid::now_v7();
1133
1134        let parquet_writer_properties = WriterProperties::builder()
1135            .set_max_row_group_size(
1136                writer_param
1137                    .streaming_config
1138                    .developer
1139                    .iceberg_sink_write_parquet_max_row_group_rows,
1140            )
1141            .build();
1142
1143        let parquet_writer_builder =
1144            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1145        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1146            parquet_writer_builder,
1147            table.file_io().clone(),
1148            DefaultLocationGenerator::new(table.metadata().clone())
1149                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1150            DefaultFileNameGenerator::new(
1151                writer_param.actor_id.to_string(),
1152                Some(unique_uuid_suffix.to_string()),
1153                iceberg::spec::DataFileFormat::Parquet,
1154            ),
1155        );
1156        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1157        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1158            data_file_builder,
1159            write_qps.clone(),
1160            write_latency.clone(),
1161        );
1162        let writer_builder = TaskWriterBuilderWrapper::new(
1163            monitored_builder,
1164            fanout_enabled,
1165            schema.clone(),
1166            partition_spec.clone(),
1167            true,
1168        );
1169        let inner_writer = Some(Box::new(
1170            writer_builder
1171                .clone()
1172                .build()
1173                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1174        ) as Box<dyn IcebergWriter>);
1175        Ok(Self {
1176            arrow_schema: Arc::new(
1177                schema_to_arrow_schema(table.metadata().current_schema())
1178                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1179            ),
1180            metrics: IcebergWriterMetrics {
1181                _write_qps: write_qps,
1182                _write_latency: write_latency,
1183                write_bytes,
1184            },
1185            writer: IcebergWriterDispatch::Append {
1186                writer: inner_writer,
1187                writer_builder,
1188            },
1189            table,
1190            project_idx_vec: {
1191                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1192                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1193                } else {
1194                    ProjectIdxVec::None
1195                }
1196            },
1197        })
1198    }
1199
1200    fn build_upsert(
1201        table: Table,
1202        unique_column_ids: Vec<usize>,
1203        writer_param: &SinkWriterParam,
1204    ) -> Result<Self> {
1205        let SinkWriterParam {
1206            extra_partition_col_idx,
1207            actor_id,
1208            sink_id,
1209            sink_name,
1210            ..
1211        } = writer_param;
1212        let metrics_labels = [
1213            &actor_id.to_string(),
1214            &sink_id.to_string(),
1215            sink_name.as_str(),
1216        ];
1217        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1218
1219        // Metrics
1220        let write_qps = GLOBAL_SINK_METRICS
1221            .iceberg_write_qps
1222            .with_guarded_label_values(&metrics_labels);
1223        let write_latency = GLOBAL_SINK_METRICS
1224            .iceberg_write_latency
1225            .with_guarded_label_values(&metrics_labels);
1226        // # TODO
1227        // Unused. Add this metrics later.
1228        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1229            .iceberg_rolling_unflushed_data_file
1230            .with_guarded_label_values(&metrics_labels);
1231        let write_bytes = GLOBAL_SINK_METRICS
1232            .iceberg_write_bytes
1233            .with_guarded_label_values(&metrics_labels);
1234
1235        // Determine the schema id and partition spec id
1236        let schema = table.metadata().current_schema();
1237        let partition_spec = table.metadata().default_partition_spec();
1238        let fanout_enabled = !partition_spec.fields().is_empty();
1239
1240        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1241        let unique_uuid_suffix = Uuid::now_v7();
1242
1243        let parquet_writer_properties = WriterProperties::builder()
1244            .set_max_row_group_size(
1245                writer_param
1246                    .streaming_config
1247                    .developer
1248                    .iceberg_sink_write_parquet_max_row_group_rows,
1249            )
1250            .build();
1251
1252        let data_file_builder = {
1253            let parquet_writer_builder =
1254                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1255            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1256                parquet_writer_builder,
1257                table.file_io().clone(),
1258                DefaultLocationGenerator::new(table.metadata().clone())
1259                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1260                DefaultFileNameGenerator::new(
1261                    writer_param.actor_id.to_string(),
1262                    Some(unique_uuid_suffix.to_string()),
1263                    iceberg::spec::DataFileFormat::Parquet,
1264                ),
1265            );
1266            DataFileWriterBuilder::new(rolling_writer_builder)
1267        };
1268        let position_delete_builder = {
1269            let parquet_writer_builder = ParquetWriterBuilder::new(
1270                parquet_writer_properties.clone(),
1271                POSITION_DELETE_SCHEMA.clone().into(),
1272            );
1273            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1274                parquet_writer_builder,
1275                table.file_io().clone(),
1276                DefaultLocationGenerator::new(table.metadata().clone())
1277                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1278                DefaultFileNameGenerator::new(
1279                    writer_param.actor_id.to_string(),
1280                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1281                    iceberg::spec::DataFileFormat::Parquet,
1282                ),
1283            );
1284            PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1285        };
1286        let equality_delete_builder = {
1287            let config = EqualityDeleteWriterConfig::new(
1288                unique_column_ids.clone(),
1289                table.metadata().current_schema().clone(),
1290            )
1291            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1292            let parquet_writer_builder = ParquetWriterBuilder::new(
1293                parquet_writer_properties,
1294                Arc::new(
1295                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1296                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1297                ),
1298            );
1299            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1300                parquet_writer_builder,
1301                table.file_io().clone(),
1302                DefaultLocationGenerator::new(table.metadata().clone())
1303                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1304                DefaultFileNameGenerator::new(
1305                    writer_param.actor_id.to_string(),
1306                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1307                    iceberg::spec::DataFileFormat::Parquet,
1308                ),
1309            );
1310
1311            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1312        };
1313        let delta_builder = DeltaWriterBuilder::new(
1314            data_file_builder,
1315            position_delete_builder,
1316            equality_delete_builder,
1317            unique_column_ids,
1318            schema.clone(),
1319        );
1320        let original_arrow_schema = Arc::new(
1321            schema_to_arrow_schema(table.metadata().current_schema())
1322                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1323        );
1324        let schema_with_extra_op_column = {
1325            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1326            new_fields.push(Arc::new(ArrowField::new(
1327                "op".to_owned(),
1328                ArrowDataType::Int32,
1329                false,
1330            )));
1331            Arc::new(ArrowSchema::new(new_fields))
1332        };
1333        let writer_builder = TaskWriterBuilderWrapper::new(
1334            MonitoredGeneralWriterBuilder::new(
1335                delta_builder,
1336                write_qps.clone(),
1337                write_latency.clone(),
1338            ),
1339            fanout_enabled,
1340            schema.clone(),
1341            partition_spec.clone(),
1342            true,
1343        );
1344        let inner_writer = Some(Box::new(
1345            writer_builder
1346                .clone()
1347                .build()
1348                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1349        ) as Box<dyn IcebergWriter>);
1350        Ok(Self {
1351            arrow_schema: original_arrow_schema,
1352            metrics: IcebergWriterMetrics {
1353                _write_qps: write_qps,
1354                _write_latency: write_latency,
1355                write_bytes,
1356            },
1357            table,
1358            writer: IcebergWriterDispatch::Upsert {
1359                writer: inner_writer,
1360                writer_builder,
1361                arrow_schema_with_op_column: schema_with_extra_op_column,
1362            },
1363            project_idx_vec: {
1364                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1365                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1366                } else {
1367                    ProjectIdxVec::None
1368                }
1369            },
1370        })
1371    }
1372}
1373
1374#[async_trait]
1375impl SinkWriter for IcebergSinkWriter {
1376    type CommitMetadata = Option<SinkMetadata>;
1377
1378    /// Begin a new epoch
1379    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1380        let Self::Created(args) = self else {
1381            return Ok(());
1382        };
1383
1384        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1385        let inner = match &args.unique_column_ids {
1386            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1387                table,
1388                unique_column_ids.clone(),
1389                &args.writer_param,
1390            )?,
1391            None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1392        };
1393
1394        *self = IcebergSinkWriter::Initialized(inner);
1395        Ok(())
1396    }
1397
1398    /// Write a stream chunk to sink
1399    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1400        let Self::Initialized(inner) = self else {
1401            unreachable!("IcebergSinkWriter should be initialized before barrier");
1402        };
1403
1404        // Try to build writer if it's None.
1405        match &mut inner.writer {
1406            IcebergWriterDispatch::Append {
1407                writer,
1408                writer_builder,
1409            } => {
1410                if writer.is_none() {
1411                    *writer = Some(Box::new(
1412                        writer_builder
1413                            .clone()
1414                            .build()
1415                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1416                    ));
1417                }
1418            }
1419            IcebergWriterDispatch::Upsert {
1420                writer,
1421                writer_builder,
1422                ..
1423            } => {
1424                if writer.is_none() {
1425                    *writer = Some(Box::new(
1426                        writer_builder
1427                            .clone()
1428                            .build()
1429                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1430                    ));
1431                }
1432            }
1433        };
1434
1435        // Process the chunk.
1436        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1437        match &mut inner.project_idx_vec {
1438            ProjectIdxVec::None => {}
1439            ProjectIdxVec::Prepare(idx) => {
1440                if *idx >= chunk.columns().len() {
1441                    return Err(SinkError::Iceberg(anyhow!(
1442                        "invalid extra partition column index {}",
1443                        idx
1444                    )));
1445                }
1446                let project_idx_vec = (0..*idx)
1447                    .chain(*idx + 1..chunk.columns().len())
1448                    .collect_vec();
1449                chunk = chunk.project(&project_idx_vec);
1450                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1451            }
1452            ProjectIdxVec::Done(idx_vec) => {
1453                chunk = chunk.project(idx_vec);
1454            }
1455        }
1456        if ops.is_empty() {
1457            return Ok(());
1458        }
1459        let write_batch_size = chunk.estimated_heap_size();
1460        let batch = match &inner.writer {
1461            IcebergWriterDispatch::Append { .. } => {
1462                // separate out insert chunk
1463                let filters =
1464                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1465                chunk.set_visibility(filters);
1466                IcebergArrowConvert
1467                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1468                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1469            }
1470            IcebergWriterDispatch::Upsert {
1471                arrow_schema_with_op_column,
1472                ..
1473            } => {
1474                let chunk = IcebergArrowConvert
1475                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1476                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1477                let ops = Arc::new(Int32Array::from(
1478                    ops.iter()
1479                        .map(|op| match op {
1480                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1481                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1482                        })
1483                        .collect_vec(),
1484                ));
1485                let mut columns = chunk.columns().to_vec();
1486                columns.push(ops);
1487                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1488                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1489            }
1490        };
1491
1492        let writer = inner.writer.get_writer().unwrap();
1493        writer
1494            .write(batch)
1495            .instrument_await("iceberg_write")
1496            .await
1497            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1498        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1499        Ok(())
1500    }
1501
1502    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1503    /// writer should commit the current epoch.
1504    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1505        let Self::Initialized(inner) = self else {
1506            unreachable!("IcebergSinkWriter should be initialized before barrier");
1507        };
1508
1509        // Skip it if not checkpoint
1510        if !is_checkpoint {
1511            return Ok(None);
1512        }
1513
1514        let close_result = match &mut inner.writer {
1515            IcebergWriterDispatch::Append {
1516                writer,
1517                writer_builder,
1518            } => {
1519                let close_result = match writer.take() {
1520                    Some(mut writer) => {
1521                        Some(writer.close().instrument_await("iceberg_close").await)
1522                    }
1523                    _ => None,
1524                };
1525                match writer_builder.clone().build() {
1526                    Ok(new_writer) => {
1527                        *writer = Some(Box::new(new_writer));
1528                    }
1529                    _ => {
1530                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1531                        // here because current writer may close successfully. So we just log the error.
1532                        warn!("Failed to build new writer after close");
1533                    }
1534                }
1535                close_result
1536            }
1537            IcebergWriterDispatch::Upsert {
1538                writer,
1539                writer_builder,
1540                ..
1541            } => {
1542                let close_result = match writer.take() {
1543                    Some(mut writer) => {
1544                        Some(writer.close().instrument_await("iceberg_close").await)
1545                    }
1546                    _ => None,
1547                };
1548                match writer_builder.clone().build() {
1549                    Ok(new_writer) => {
1550                        *writer = Some(Box::new(new_writer));
1551                    }
1552                    _ => {
1553                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1554                        // here because current writer may close successfully. So we just log the error.
1555                        warn!("Failed to build new writer after close");
1556                    }
1557                }
1558                close_result
1559            }
1560        };
1561
1562        match close_result {
1563            Some(Ok(result)) => {
1564                let format_version = inner.table.metadata().format_version();
1565                let partition_type = inner.table.metadata().default_partition_type();
1566                let data_files = result
1567                    .into_iter()
1568                    .map(|f| {
1569                        // Truncate large column statistics BEFORE serialization
1570                        let truncated = truncate_datafile(f);
1571                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1572                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1573                    })
1574                    .collect::<Result<Vec<_>>>()?;
1575                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1576                    data_files,
1577                    schema_id: inner.table.metadata().current_schema_id(),
1578                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1579                })?))
1580            }
1581            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1582            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1583        }
1584    }
1585}
1586
1587const SCHEMA_ID: &str = "schema_id";
1588const PARTITION_SPEC_ID: &str = "partition_spec_id";
1589const DATA_FILES: &str = "data_files";
1590
1591/// Maximum size for column statistics (min/max values) in bytes.
1592/// Column statistics larger than this will be truncated to avoid metadata bloat.
1593/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1594///
1595/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1596/// We truncate at the `DataFile` level (before serialization) by directly modifying
1597/// the public `lower_bounds` and `upper_bounds` fields.
1598///
1599/// This prevents metadata from ballooning to gigabytes when dealing with large
1600/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1601/// that benefit from query optimization.
1602const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1603
1604/// Truncate large column statistics from `DataFile` BEFORE serialization.
1605///
1606/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1607/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1608///
1609/// # Arguments
1610/// * `data_file` - A `DataFile` to process
1611///
1612/// # Returns
1613/// The modified `DataFile` with large statistics truncated
1614fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1615    // Process lower_bounds - remove entries with large values
1616    data_file.lower_bounds.retain(|field_id, datum| {
1617        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1618        let size = match datum.to_bytes() {
1619            Ok(bytes) => bytes.len(),
1620            Err(_) => 0,
1621        };
1622
1623        if size > MAX_COLUMN_STAT_SIZE {
1624            tracing::debug!(
1625                field_id = field_id,
1626                size = size,
1627                "Truncating large lower_bound statistic"
1628            );
1629            return false;
1630        }
1631        true
1632    });
1633
1634    // Process upper_bounds - remove entries with large values
1635    data_file.upper_bounds.retain(|field_id, datum| {
1636        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1637        let size = match datum.to_bytes() {
1638            Ok(bytes) => bytes.len(),
1639            Err(_) => 0,
1640        };
1641
1642        if size > MAX_COLUMN_STAT_SIZE {
1643            tracing::debug!(
1644                field_id = field_id,
1645                size = size,
1646                "Truncating large upper_bound statistic"
1647            );
1648            return false;
1649        }
1650        true
1651    });
1652
1653    data_file
1654}
1655
1656#[derive(Default, Clone)]
1657struct IcebergCommitResult {
1658    schema_id: i32,
1659    partition_spec_id: i32,
1660    data_files: Vec<SerializedDataFile>,
1661}
1662
1663impl IcebergCommitResult {
1664    fn try_from(value: &SinkMetadata) -> Result<Self> {
1665        if let Some(Serialized(v)) = &value.metadata {
1666            let mut values = if let serde_json::Value::Object(v) =
1667                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1668                    .context("Can't parse iceberg sink metadata")?
1669            {
1670                v
1671            } else {
1672                bail!("iceberg sink metadata should be an object");
1673            };
1674
1675            let schema_id;
1676            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1677                schema_id = value
1678                    .as_u64()
1679                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1680            } else {
1681                bail!("iceberg sink metadata should have schema_id");
1682            }
1683
1684            let partition_spec_id;
1685            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1686                partition_spec_id = value
1687                    .as_u64()
1688                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1689            } else {
1690                bail!("iceberg sink metadata should have partition_spec_id");
1691            }
1692
1693            let data_files: Vec<SerializedDataFile>;
1694            if let serde_json::Value::Array(values) = values
1695                .remove(DATA_FILES)
1696                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1697            {
1698                data_files = values
1699                    .into_iter()
1700                    .map(from_value::<SerializedDataFile>)
1701                    .collect::<std::result::Result<_, _>>()
1702                    .unwrap();
1703            } else {
1704                bail!("iceberg sink metadata should have data_files object");
1705            }
1706
1707            Ok(Self {
1708                schema_id: schema_id as i32,
1709                partition_spec_id: partition_spec_id as i32,
1710                data_files,
1711            })
1712        } else {
1713            bail!("Can't create iceberg sink write result from empty data!")
1714        }
1715    }
1716
1717    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1718        let mut values = if let serde_json::Value::Object(value) =
1719            serde_json::from_slice::<serde_json::Value>(&value)
1720                .context("Can't parse iceberg sink metadata")?
1721        {
1722            value
1723        } else {
1724            bail!("iceberg sink metadata should be an object");
1725        };
1726
1727        let schema_id;
1728        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1729            schema_id = value
1730                .as_u64()
1731                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1732        } else {
1733            bail!("iceberg sink metadata should have schema_id");
1734        }
1735
1736        let partition_spec_id;
1737        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1738            partition_spec_id = value
1739                .as_u64()
1740                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1741        } else {
1742            bail!("iceberg sink metadata should have partition_spec_id");
1743        }
1744
1745        let data_files: Vec<SerializedDataFile>;
1746        if let serde_json::Value::Array(values) = values
1747            .remove(DATA_FILES)
1748            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1749        {
1750            data_files = values
1751                .into_iter()
1752                .map(from_value::<SerializedDataFile>)
1753                .collect::<std::result::Result<_, _>>()
1754                .unwrap();
1755        } else {
1756            bail!("iceberg sink metadata should have data_files object");
1757        }
1758
1759        Ok(Self {
1760            schema_id: schema_id as i32,
1761            partition_spec_id: partition_spec_id as i32,
1762            data_files,
1763        })
1764    }
1765}
1766
1767impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1768    type Error = SinkError;
1769
1770    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1771        let json_data_files = serde_json::Value::Array(
1772            value
1773                .data_files
1774                .iter()
1775                .map(serde_json::to_value)
1776                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1777                .context("Can't serialize data files to json")?,
1778        );
1779        let json_value = serde_json::Value::Object(
1780            vec![
1781                (
1782                    SCHEMA_ID.to_owned(),
1783                    serde_json::Value::Number(value.schema_id.into()),
1784                ),
1785                (
1786                    PARTITION_SPEC_ID.to_owned(),
1787                    serde_json::Value::Number(value.partition_spec_id.into()),
1788                ),
1789                (DATA_FILES.to_owned(), json_data_files),
1790            ]
1791            .into_iter()
1792            .collect(),
1793        );
1794        Ok(SinkMetadata {
1795            metadata: Some(Serialized(SerializedMetadata {
1796                metadata: serde_json::to_vec(&json_value)
1797                    .context("Can't serialize iceberg sink metadata")?,
1798            })),
1799        })
1800    }
1801}
1802
1803impl TryFrom<IcebergCommitResult> for Vec<u8> {
1804    type Error = SinkError;
1805
1806    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1807        let json_data_files = serde_json::Value::Array(
1808            value
1809                .data_files
1810                .iter()
1811                .map(serde_json::to_value)
1812                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1813                .context("Can't serialize data files to json")?,
1814        );
1815        let json_value = serde_json::Value::Object(
1816            vec![
1817                (
1818                    SCHEMA_ID.to_owned(),
1819                    serde_json::Value::Number(value.schema_id.into()),
1820                ),
1821                (
1822                    PARTITION_SPEC_ID.to_owned(),
1823                    serde_json::Value::Number(value.partition_spec_id.into()),
1824                ),
1825                (DATA_FILES.to_owned(), json_data_files),
1826            ]
1827            .into_iter()
1828            .collect(),
1829        );
1830        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1831    }
1832}
1833pub struct IcebergSinkCommitter {
1834    catalog: Arc<dyn Catalog>,
1835    table: Table,
1836    pub last_commit_epoch: u64,
1837    pub(crate) sink_id: SinkId,
1838    pub(crate) config: IcebergConfig,
1839    pub(crate) param: SinkParam,
1840    commit_retry_num: u32,
1841    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1842}
1843
1844impl IcebergSinkCommitter {
1845    // Reload table and guarantee current schema_id and partition_spec_id matches
1846    // given `schema_id` and `partition_spec_id`
1847    async fn reload_table(
1848        catalog: &dyn Catalog,
1849        table_ident: &TableIdent,
1850        schema_id: i32,
1851        partition_spec_id: i32,
1852    ) -> Result<Table> {
1853        let table = catalog
1854            .load_table(table_ident)
1855            .await
1856            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1857        if table.metadata().current_schema_id() != schema_id {
1858            return Err(SinkError::Iceberg(anyhow!(
1859                "Schema evolution not supported, expect schema id {}, but got {}",
1860                schema_id,
1861                table.metadata().current_schema_id()
1862            )));
1863        }
1864        if table.metadata().default_partition_spec_id() != partition_spec_id {
1865            return Err(SinkError::Iceberg(anyhow!(
1866                "Partition evolution not supported, expect partition spec id {}, but got {}",
1867                partition_spec_id,
1868                table.metadata().default_partition_spec_id()
1869            )));
1870        }
1871        Ok(table)
1872    }
1873}
1874
1875#[async_trait]
1876impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1877    async fn init(&mut self) -> Result<()> {
1878        tracing::info!(
1879            sink_id = %self.param.sink_id,
1880            "Iceberg sink coordinator initialized",
1881        );
1882
1883        Ok(())
1884    }
1885
1886    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1887        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
1888
1889        if metadata.is_empty() {
1890            tracing::debug!(?epoch, "No datafile to commit");
1891            return Ok(());
1892        }
1893
1894        // Commit data if present
1895        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
1896            self.commit_data_impl(epoch, write_results, snapshot_id)
1897                .await?;
1898        }
1899
1900        Ok(())
1901    }
1902
1903    async fn commit_schema_change(
1904        &mut self,
1905        epoch: u64,
1906        schema_change: PbSinkSchemaChange,
1907    ) -> Result<()> {
1908        tracing::info!(
1909            "Committing schema change {:?} in epoch {}",
1910            schema_change,
1911            epoch
1912        );
1913        self.commit_schema_change_impl(schema_change).await?;
1914        tracing::info!("Successfully committed schema change in epoch {}", epoch);
1915
1916        Ok(())
1917    }
1918}
1919
1920#[async_trait]
1921impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1922    async fn init(&mut self) -> Result<()> {
1923        tracing::info!(
1924            sink_id = %self.param.sink_id,
1925            "Iceberg sink coordinator initialized",
1926        );
1927
1928        Ok(())
1929    }
1930
1931    async fn pre_commit(
1932        &mut self,
1933        epoch: u64,
1934        metadata: Vec<SinkMetadata>,
1935        _schema_change: Option<PbSinkSchemaChange>,
1936    ) -> Result<Option<Vec<u8>>> {
1937        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
1938
1939        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
1940            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1941            None => {
1942                tracing::debug!(?epoch, "no data to pre commit");
1943                return Ok(None);
1944            }
1945        };
1946
1947        let mut write_results_bytes = Vec::new();
1948        for each_parallelism_write_result in write_results {
1949            let each_parallelism_write_result_bytes: Vec<u8> =
1950                each_parallelism_write_result.try_into()?;
1951            write_results_bytes.push(each_parallelism_write_result_bytes);
1952        }
1953
1954        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1955        write_results_bytes.push(snapshot_id_bytes);
1956
1957        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1958        Ok(Some(pre_commit_metadata_bytes))
1959    }
1960
1961    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1962        tracing::debug!("Starting iceberg commit in epoch {epoch}");
1963
1964        if commit_metadata.is_empty() {
1965            tracing::debug!(?epoch, "No datafile to commit");
1966            return Ok(());
1967        }
1968
1969        // Deserialize commit metadata
1970        let mut payload = deserialize_metadata(commit_metadata);
1971        if payload.is_empty() {
1972            return Err(SinkError::Iceberg(anyhow!(
1973                "Invalid commit metadata: empty payload"
1974            )));
1975        }
1976
1977        // Last element is snapshot_id
1978        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
1979            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
1980        })?;
1981        let snapshot_id = i64::from_le_bytes(
1982            snapshot_id_bytes
1983                .try_into()
1984                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1985        );
1986
1987        // Remaining elements are write_results
1988        let write_results = payload
1989            .into_iter()
1990            .map(IcebergCommitResult::try_from_serialized_bytes)
1991            .collect::<Result<Vec<_>>>()?;
1992
1993        let snapshot_committed = self
1994            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1995            .await?;
1996
1997        if snapshot_committed {
1998            tracing::info!(
1999                "Snapshot id {} already committed in iceberg table, skip committing again.",
2000                snapshot_id
2001            );
2002            return Ok(());
2003        }
2004
2005        self.commit_data_impl(epoch, write_results, snapshot_id)
2006            .await
2007    }
2008
2009    async fn commit_schema_change(
2010        &mut self,
2011        epoch: u64,
2012        schema_change: PbSinkSchemaChange,
2013    ) -> Result<()> {
2014        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2015        if schema_updated {
2016            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2017            return Ok(());
2018        }
2019
2020        tracing::info!(
2021            "Committing schema change {:?} in epoch {}",
2022            schema_change,
2023            epoch
2024        );
2025        self.commit_schema_change_impl(schema_change).await?;
2026        tracing::info!("Successfully committed schema change in epoch {epoch}");
2027
2028        Ok(())
2029    }
2030
2031    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2032        // TODO: Files that have been written but not committed should be deleted.
2033        tracing::debug!("Abort not implemented yet");
2034    }
2035}
2036
2037/// Methods Required to Achieve Exactly Once Semantics
2038impl IcebergSinkCommitter {
2039    fn pre_commit_inner(
2040        &mut self,
2041        _epoch: u64,
2042        metadata: Vec<SinkMetadata>,
2043    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2044        let write_results: Vec<IcebergCommitResult> = metadata
2045            .iter()
2046            .map(IcebergCommitResult::try_from)
2047            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2048
2049        // Skip if no data to commit
2050        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2051            return Ok(None);
2052        }
2053
2054        let expect_schema_id = write_results[0].schema_id;
2055        let expect_partition_spec_id = write_results[0].partition_spec_id;
2056
2057        // guarantee that all write results has same schema_id and partition_spec_id
2058        if write_results
2059            .iter()
2060            .any(|r| r.schema_id != expect_schema_id)
2061            || write_results
2062                .iter()
2063                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2064        {
2065            return Err(SinkError::Iceberg(anyhow!(
2066                "schema_id and partition_spec_id should be the same in all write results"
2067            )));
2068        }
2069
2070        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2071
2072        Ok(Some((write_results, snapshot_id)))
2073    }
2074
2075    async fn commit_data_impl(
2076        &mut self,
2077        epoch: u64,
2078        write_results: Vec<IcebergCommitResult>,
2079        snapshot_id: i64,
2080    ) -> Result<()> {
2081        // Empty write results should be handled before calling this function.
2082        assert!(
2083            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2084        );
2085
2086        // Check snapshot limit before proceeding with commit
2087        self.wait_for_snapshot_limit().await?;
2088
2089        let expect_schema_id = write_results[0].schema_id;
2090        let expect_partition_spec_id = write_results[0].partition_spec_id;
2091
2092        // Load the latest table to avoid concurrent modification with the best effort.
2093        self.table = Self::reload_table(
2094            self.catalog.as_ref(),
2095            self.table.identifier(),
2096            expect_schema_id,
2097            expect_partition_spec_id,
2098        )
2099        .await?;
2100
2101        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2102            return Err(SinkError::Iceberg(anyhow!(
2103                "Can't find schema by id {}",
2104                expect_schema_id
2105            )));
2106        };
2107        let Some(partition_spec) = self
2108            .table
2109            .metadata()
2110            .partition_spec_by_id(expect_partition_spec_id)
2111        else {
2112            return Err(SinkError::Iceberg(anyhow!(
2113                "Can't find partition spec by id {}",
2114                expect_partition_spec_id
2115            )));
2116        };
2117        let partition_type = partition_spec
2118            .as_ref()
2119            .clone()
2120            .partition_type(schema)
2121            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2122
2123        let data_files = write_results
2124            .into_iter()
2125            .flat_map(|r| {
2126                r.data_files.into_iter().map(|f| {
2127                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2128                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2129                })
2130            })
2131            .collect::<Result<Vec<DataFile>>>()?;
2132        // # TODO:
2133        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2134        // because retry logic involved reapply the commit metadata.
2135        // For now, we just retry the commit operation.
2136        let retry_strategy = ExponentialBackoff::from_millis(10)
2137            .max_delay(Duration::from_secs(60))
2138            .map(jitter)
2139            .take(self.commit_retry_num as usize);
2140        let catalog = self.catalog.clone();
2141        let table_ident = self.table.identifier().clone();
2142
2143        // Custom retry logic that:
2144        // 1. Calls reload_table before each commit attempt to get the latest metadata
2145        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2146        // 3. If commit fails, retries with backoff
2147        enum CommitError {
2148            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2149            Commit(SinkError),      // Retriable: commit conflicts, network errors
2150        }
2151
2152        let table = RetryIf::spawn(
2153            retry_strategy,
2154            || async {
2155                // Reload table before each commit attempt to get the latest metadata
2156                let table = Self::reload_table(
2157                    catalog.as_ref(),
2158                    &table_ident,
2159                    expect_schema_id,
2160                    expect_partition_spec_id,
2161                )
2162                .await
2163                .map_err(|e| {
2164                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2165                    CommitError::ReloadTable(e)
2166                })?;
2167
2168                let txn = Transaction::new(&table);
2169                let append_action = txn
2170                    .fast_append()
2171                    .set_snapshot_id(snapshot_id)
2172                    .set_target_branch(commit_branch(
2173                        self.config.r#type.as_str(),
2174                        self.config.write_mode,
2175                    ))
2176                    .add_data_files(data_files.clone());
2177
2178                let tx = append_action.apply(txn).map_err(|err| {
2179                    let err: IcebergError = err.into();
2180                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2181                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2182                })?;
2183
2184                tx.commit(catalog.as_ref()).await.map_err(|err| {
2185                    let err: IcebergError = err.into();
2186                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2187                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2188                })
2189            },
2190            |err: &CommitError| {
2191                // Only retry on commit errors, not on reload_table errors
2192                match err {
2193                    CommitError::Commit(_) => {
2194                        tracing::warn!("Commit failed, will retry");
2195                        true
2196                    }
2197                    CommitError::ReloadTable(_) => {
2198                        tracing::error!(
2199                            "reload_table failed with non-retriable error, will not retry"
2200                        );
2201                        false
2202                    }
2203                }
2204            },
2205        )
2206        .await
2207        .map_err(|e| match e {
2208            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2209        })?;
2210        self.table = table;
2211
2212        let snapshot_num = self.table.metadata().snapshots().count();
2213        let catalog_name = self.config.common.catalog_name();
2214        let table_name = self.table.identifier().to_string();
2215        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2216        GLOBAL_SINK_METRICS
2217            .iceberg_snapshot_num
2218            .with_guarded_label_values(&metrics_labels)
2219            .set(snapshot_num as i64);
2220
2221        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2222
2223        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2224            && self.config.enable_compaction
2225            && iceberg_compact_stat_sender
2226                .send(IcebergSinkCompactionUpdate {
2227                    sink_id: self.sink_id,
2228                    compaction_interval: self.config.compaction_interval_sec(),
2229                    force_compaction: false,
2230                })
2231                .is_err()
2232        {
2233            warn!("failed to send iceberg compaction stats");
2234        }
2235
2236        Ok(())
2237    }
2238
2239    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2240    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2241    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2242    async fn is_snapshot_id_in_iceberg(
2243        &self,
2244        iceberg_config: &IcebergConfig,
2245        snapshot_id: i64,
2246    ) -> Result<bool> {
2247        let table = iceberg_config.load_table().await?;
2248        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2249            Ok(true)
2250        } else {
2251            Ok(false)
2252        }
2253    }
2254
2255    /// Check if the specified columns already exist in the iceberg table's current schema.
2256    /// This is used to determine if schema change has already been applied.
2257    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2258        let current_schema = self.table.metadata().current_schema();
2259        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2260            .context("Failed to convert schema")
2261            .map_err(SinkError::Iceberg)?;
2262
2263        let iceberg_arrow_convert = IcebergArrowConvert;
2264
2265        let schema_matches = |expected: &[ArrowField]| {
2266            if current_arrow_schema.fields().len() != expected.len() {
2267                return false;
2268            }
2269
2270            expected.iter().all(|expected_field| {
2271                current_arrow_schema.fields().iter().any(|current_field| {
2272                    current_field.name() == expected_field.name()
2273                        && current_field.data_type() == expected_field.data_type()
2274                })
2275            })
2276        };
2277
2278        let original_arrow_fields: Vec<ArrowField> = schema_change
2279            .original_schema
2280            .iter()
2281            .map(|pb_field| {
2282                let field = Field::from(pb_field);
2283                iceberg_arrow_convert
2284                    .to_arrow_field(&field.name, &field.data_type)
2285                    .context("Failed to convert field to arrow")
2286                    .map_err(SinkError::Iceberg)
2287            })
2288            .collect::<Result<_>>()?;
2289
2290        // If current schema equals original_schema, then schema change is NOT applied.
2291        if schema_matches(&original_arrow_fields) {
2292            tracing::debug!(
2293                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2294                original_arrow_fields.len()
2295            );
2296            return Ok(false);
2297        }
2298
2299        // We only support add_columns for now.
2300        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2301            schema_change.op.as_ref()
2302        else {
2303            return Err(SinkError::Iceberg(anyhow!(
2304                "Unsupported sink schema change op in iceberg sink: {:?}",
2305                schema_change.op
2306            )));
2307        };
2308
2309        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2310            .fields
2311            .iter()
2312            .map(|pb_field| {
2313                let field = Field::from(pb_field);
2314                iceberg_arrow_convert
2315                    .to_arrow_field(&field.name, &field.data_type)
2316                    .context("Failed to convert field to arrow")
2317                    .map_err(SinkError::Iceberg)
2318            })
2319            .collect::<Result<_>>()?;
2320
2321        let mut expected_after_change = original_arrow_fields;
2322        expected_after_change.extend(add_arrow_fields);
2323
2324        // If current schema equals original_schema + add_columns, then schema change is applied.
2325        if schema_matches(&expected_after_change) {
2326            tracing::debug!(
2327                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2328                expected_after_change.len()
2329            );
2330            return Ok(true);
2331        }
2332
2333        Err(SinkError::Iceberg(anyhow!(
2334            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2335            schema_change.original_schema.len()
2336        )))
2337    }
2338
2339    /// Commit schema changes (e.g., add columns) to the iceberg table.
2340    /// This function uses Transaction API to atomically update the table schema
2341    /// with optimistic locking to prevent concurrent conflicts.
2342    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2343        use iceberg::spec::NestedField;
2344
2345        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2346            schema_change.op.as_ref()
2347        else {
2348            return Err(SinkError::Iceberg(anyhow!(
2349                "Unsupported sink schema change op in iceberg sink: {:?}",
2350                schema_change.op
2351            )));
2352        };
2353
2354        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2355
2356        // Step 1: Get current table metadata
2357        let metadata = self.table.metadata();
2358        let mut next_field_id = metadata.last_column_id() + 1;
2359        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2360
2361        // Step 2: Build new fields to add
2362        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2363        let mut new_fields = Vec::new();
2364
2365        for field in &add_columns {
2366            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2367            let arrow_field = iceberg_create_table_arrow_convert
2368                .to_arrow_field(&field.name, &field.data_type)
2369                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2370                .map_err(SinkError::Iceberg)?;
2371
2372            // Convert Arrow DataType to Iceberg Type
2373            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2374                .map_err(|err| {
2375                    SinkError::Iceberg(
2376                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2377                    )
2378                })?;
2379
2380            // Create NestedField with the next available field ID
2381            let nested_field = Arc::new(NestedField::optional(
2382                next_field_id,
2383                &field.name,
2384                iceberg_type,
2385            ));
2386
2387            new_fields.push(nested_field);
2388            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2389            next_field_id += 1;
2390        }
2391
2392        // Step 3: Create Transaction with UpdateSchemaAction
2393        tracing::info!(
2394            "Committing schema change to catalog for table {}",
2395            self.table.identifier()
2396        );
2397
2398        let txn = Transaction::new(&self.table);
2399        let action = txn.update_schema().add_fields(new_fields);
2400
2401        let updated_table = action
2402            .apply(txn)
2403            .context("Failed to apply schema update action")
2404            .map_err(SinkError::Iceberg)?
2405            .commit(self.catalog.as_ref())
2406            .await
2407            .context("Failed to commit table schema change")
2408            .map_err(SinkError::Iceberg)?;
2409
2410        self.table = updated_table;
2411
2412        tracing::info!(
2413            "Successfully committed schema change, added {} columns to iceberg table",
2414            add_columns.len()
2415        );
2416
2417        Ok(())
2418    }
2419
2420    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2421    /// Returns the number of snapshots since the last rewrite/overwrite
2422    fn count_snapshots_since_rewrite(&self) -> usize {
2423        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2424        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2425
2426        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2427        let mut count = 0;
2428        for snapshot in snapshots {
2429            // Check if this snapshot represents a rewrite or overwrite operation
2430            let summary = snapshot.summary();
2431            match &summary.operation {
2432                Operation::Replace => {
2433                    // Found a rewrite/overwrite operation, stop counting
2434                    break;
2435                }
2436
2437                _ => {
2438                    // Increment count for each snapshot that is not a rewrite/overwrite
2439                    count += 1;
2440                }
2441            }
2442        }
2443
2444        count
2445    }
2446
2447    /// Wait until snapshot count since last rewrite is below the limit
2448    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2449        if !self.config.enable_compaction {
2450            return Ok(());
2451        }
2452
2453        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2454            loop {
2455                let current_count = self.count_snapshots_since_rewrite();
2456
2457                if current_count < max_snapshots {
2458                    tracing::info!(
2459                        "Snapshot count check passed: {} < {}",
2460                        current_count,
2461                        max_snapshots
2462                    );
2463                    break;
2464                }
2465
2466                tracing::info!(
2467                    "Snapshot count {} exceeds limit {}, waiting...",
2468                    current_count,
2469                    max_snapshots
2470                );
2471
2472                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2473                    && iceberg_compact_stat_sender
2474                        .send(IcebergSinkCompactionUpdate {
2475                            sink_id: self.sink_id,
2476                            compaction_interval: self.config.compaction_interval_sec(),
2477                            force_compaction: true,
2478                        })
2479                        .is_err()
2480                {
2481                    tracing::warn!("failed to send iceberg compaction stats");
2482                }
2483
2484                // Wait for 30 seconds before checking again
2485                tokio::time::sleep(Duration::from_secs(30)).await;
2486
2487                // Reload table to get latest snapshots
2488                self.table = self.config.load_table().await?;
2489            }
2490        }
2491        Ok(())
2492    }
2493}
2494
2495const MAP_KEY: &str = "key";
2496const MAP_VALUE: &str = "value";
2497
2498fn get_fields<'a>(
2499    our_field_type: &'a risingwave_common::types::DataType,
2500    data_type: &ArrowDataType,
2501    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2502) -> Option<ArrowFields> {
2503    match data_type {
2504        ArrowDataType::Struct(fields) => {
2505            match our_field_type {
2506                risingwave_common::types::DataType::Struct(struct_fields) => {
2507                    struct_fields.iter().for_each(|(name, data_type)| {
2508                        let res = schema_fields.insert(name, data_type);
2509                        // This assert is to make sure there is no duplicate field name in the schema.
2510                        assert!(res.is_none())
2511                    });
2512                }
2513                risingwave_common::types::DataType::Map(map_fields) => {
2514                    schema_fields.insert(MAP_KEY, map_fields.key());
2515                    schema_fields.insert(MAP_VALUE, map_fields.value());
2516                }
2517                risingwave_common::types::DataType::List(list) => {
2518                    list.elem()
2519                        .as_struct()
2520                        .iter()
2521                        .for_each(|(name, data_type)| {
2522                            let res = schema_fields.insert(name, data_type);
2523                            // This assert is to make sure there is no duplicate field name in the schema.
2524                            assert!(res.is_none())
2525                        });
2526                }
2527                _ => {}
2528            };
2529            Some(fields.clone())
2530        }
2531        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2532            get_fields(our_field_type, field.data_type(), schema_fields)
2533        }
2534        _ => None, // not a supported complex type and unlikely to show up
2535    }
2536}
2537
2538fn check_compatibility(
2539    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2540    fields: &ArrowFields,
2541) -> anyhow::Result<bool> {
2542    for arrow_field in fields {
2543        let our_field_type = schema_fields
2544            .get(arrow_field.name().as_str())
2545            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2546
2547        // Iceberg source should be able to read iceberg decimal type.
2548        let converted_arrow_data_type = IcebergArrowConvert
2549            .to_arrow_field("", our_field_type)
2550            .map_err(|e| anyhow!(e))?
2551            .data_type()
2552            .clone();
2553
2554        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2555            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2556            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2557            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2558            (ArrowDataType::List(_), ArrowDataType::List(field))
2559            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2560                let mut schema_fields = HashMap::new();
2561                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2562                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2563            }
2564            // validate nested structs
2565            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2566                let mut schema_fields = HashMap::new();
2567                our_field_type
2568                    .as_struct()
2569                    .iter()
2570                    .for_each(|(name, data_type)| {
2571                        let res = schema_fields.insert(name, data_type);
2572                        // This assert is to make sure there is no duplicate field name in the schema.
2573                        assert!(res.is_none())
2574                    });
2575                check_compatibility(schema_fields, fields)?
2576            }
2577            // cases where left != right (metadata, field name mismatch)
2578            //
2579            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2580            // {"PARQUET:field_id": ".."}
2581            //
2582            // map: The standard name in arrow is "entries", "key", "value".
2583            // in iceberg-rs, it's called "key_value"
2584            (left, right) => left.equals_datatype(right),
2585        };
2586        if !compatible {
2587            bail!(
2588                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2589                arrow_field.name(),
2590                converted_arrow_data_type,
2591                arrow_field.data_type()
2592            );
2593        }
2594    }
2595    Ok(true)
2596}
2597
2598/// Try to match our schema with iceberg schema.
2599pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2600    if rw_schema.fields.len() != arrow_schema.fields().len() {
2601        bail!(
2602            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2603            rw_schema.fields.len(),
2604            arrow_schema.fields.len()
2605        );
2606    }
2607
2608    let mut schema_fields = HashMap::new();
2609    rw_schema.fields.iter().for_each(|field| {
2610        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2611        // This assert is to make sure there is no duplicate field name in the schema.
2612        assert!(res.is_none())
2613    });
2614
2615    check_compatibility(schema_fields, &arrow_schema.fields)?;
2616    Ok(())
2617}
2618
2619fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2620    serde_json::to_vec(&metadata).unwrap()
2621}
2622
2623fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2624    serde_json::from_slice(&bytes).unwrap()
2625}
2626
2627pub fn parse_partition_by_exprs(
2628    expr: String,
2629) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2630    // captures column, transform(column), transform(n,column), transform(n, column)
2631    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2632    if !re.is_match(&expr) {
2633        bail!(format!(
2634            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2635            expr
2636        ))
2637    }
2638    let caps = re.captures_iter(&expr);
2639
2640    let mut partition_columns = vec![];
2641
2642    for mat in caps {
2643        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2644            (&mat["transform"], Transform::Identity)
2645        } else {
2646            let mut func = mat["transform"].to_owned();
2647            if func == "bucket" || func == "truncate" {
2648                let n = &mat
2649                    .name("n")
2650                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2651                    .as_str();
2652                func = format!("{func}[{n}]");
2653            }
2654            (
2655                &mat["field"],
2656                Transform::from_str(&func)
2657                    .with_context(|| format!("invalid transform function {}", func))?,
2658            )
2659        };
2660        partition_columns.push((column.to_owned(), transform));
2661    }
2662    Ok(partition_columns)
2663}
2664
2665pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2666    if should_enable_iceberg_cow(sink_type, write_mode) {
2667        ICEBERG_COW_BRANCH.to_owned()
2668    } else {
2669        MAIN_BRANCH.to_owned()
2670    }
2671}
2672
2673pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2674    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2675}
2676
2677impl crate::with_options::WithOptions for IcebergWriteMode {}
2678
2679impl crate::with_options::WithOptions for CompactionType {}
2680
2681#[cfg(test)]
2682mod test {
2683    use std::collections::BTreeMap;
2684
2685    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2686    use risingwave_common::types::{DataType, MapType, StructType};
2687
2688    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2689    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2690    use crate::sink::iceberg::{
2691        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2692        ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2693        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2694        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2695    };
2696
2697    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2698
2699    #[test]
2700    fn test_compatible_arrow_schema() {
2701        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2702
2703        use super::*;
2704        let risingwave_schema = Schema::new(vec![
2705            Field::with_name(DataType::Int32, "a"),
2706            Field::with_name(DataType::Int32, "b"),
2707            Field::with_name(DataType::Int32, "c"),
2708        ]);
2709        let arrow_schema = ArrowSchema::new(vec![
2710            ArrowField::new("a", ArrowDataType::Int32, false),
2711            ArrowField::new("b", ArrowDataType::Int32, false),
2712            ArrowField::new("c", ArrowDataType::Int32, false),
2713        ]);
2714
2715        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2716
2717        let risingwave_schema = Schema::new(vec![
2718            Field::with_name(DataType::Int32, "d"),
2719            Field::with_name(DataType::Int32, "c"),
2720            Field::with_name(DataType::Int32, "a"),
2721            Field::with_name(DataType::Int32, "b"),
2722        ]);
2723        let arrow_schema = ArrowSchema::new(vec![
2724            ArrowField::new("a", ArrowDataType::Int32, false),
2725            ArrowField::new("b", ArrowDataType::Int32, false),
2726            ArrowField::new("d", ArrowDataType::Int32, false),
2727            ArrowField::new("c", ArrowDataType::Int32, false),
2728        ]);
2729        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2730
2731        let risingwave_schema = Schema::new(vec![
2732            Field::with_name(
2733                DataType::Struct(StructType::new(vec![
2734                    ("a1", DataType::Int32),
2735                    (
2736                        "a2",
2737                        DataType::Struct(StructType::new(vec![
2738                            ("a21", DataType::Bytea),
2739                            (
2740                                "a22",
2741                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2742                            ),
2743                        ])),
2744                    ),
2745                ])),
2746                "a",
2747            ),
2748            Field::with_name(
2749                DataType::list(DataType::Struct(StructType::new(vec![
2750                    ("b1", DataType::Int32),
2751                    ("b2", DataType::Bytea),
2752                    (
2753                        "b3",
2754                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2755                    ),
2756                ]))),
2757                "b",
2758            ),
2759            Field::with_name(
2760                DataType::Map(MapType::from_kv(
2761                    DataType::Varchar,
2762                    DataType::list(DataType::Struct(StructType::new([
2763                        ("c1", DataType::Int32),
2764                        ("c2", DataType::Bytea),
2765                        (
2766                            "c3",
2767                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2768                        ),
2769                    ]))),
2770                )),
2771                "c",
2772            ),
2773        ]);
2774        let arrow_schema = ArrowSchema::new(vec![
2775            ArrowField::new(
2776                "a",
2777                ArrowDataType::Struct(ArrowFields::from(vec![
2778                    ArrowField::new("a1", ArrowDataType::Int32, false),
2779                    ArrowField::new(
2780                        "a2",
2781                        ArrowDataType::Struct(ArrowFields::from(vec![
2782                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2783                            ArrowField::new_map(
2784                                "a22",
2785                                "entries",
2786                                ArrowFieldRef::new(ArrowField::new(
2787                                    "key",
2788                                    ArrowDataType::Utf8,
2789                                    false,
2790                                )),
2791                                ArrowFieldRef::new(ArrowField::new(
2792                                    "value",
2793                                    ArrowDataType::Utf8,
2794                                    false,
2795                                )),
2796                                false,
2797                                false,
2798                            ),
2799                        ])),
2800                        false,
2801                    ),
2802                ])),
2803                false,
2804            ),
2805            ArrowField::new(
2806                "b",
2807                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2808                    ArrowDataType::Struct(ArrowFields::from(vec![
2809                        ArrowField::new("b1", ArrowDataType::Int32, false),
2810                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2811                        ArrowField::new_map(
2812                            "b3",
2813                            "entries",
2814                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2815                            ArrowFieldRef::new(ArrowField::new(
2816                                "value",
2817                                ArrowDataType::Utf8,
2818                                false,
2819                            )),
2820                            false,
2821                            false,
2822                        ),
2823                    ])),
2824                    false,
2825                ))),
2826                false,
2827            ),
2828            ArrowField::new_map(
2829                "c",
2830                "entries",
2831                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2832                ArrowFieldRef::new(ArrowField::new(
2833                    "value",
2834                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2835                        ArrowDataType::Struct(ArrowFields::from(vec![
2836                            ArrowField::new("c1", ArrowDataType::Int32, false),
2837                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2838                            ArrowField::new_map(
2839                                "c3",
2840                                "entries",
2841                                ArrowFieldRef::new(ArrowField::new(
2842                                    "key",
2843                                    ArrowDataType::Utf8,
2844                                    false,
2845                                )),
2846                                ArrowFieldRef::new(ArrowField::new(
2847                                    "value",
2848                                    ArrowDataType::Utf8,
2849                                    false,
2850                                )),
2851                                false,
2852                                false,
2853                            ),
2854                        ])),
2855                        false,
2856                    ))),
2857                    false,
2858                )),
2859                false,
2860                false,
2861            ),
2862        ]);
2863        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2864    }
2865
2866    #[test]
2867    fn test_parse_iceberg_config() {
2868        let values = [
2869            ("connector", "iceberg"),
2870            ("type", "upsert"),
2871            ("primary_key", "v1"),
2872            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2873            ("warehouse.path", "s3://iceberg"),
2874            ("s3.endpoint", "http://127.0.0.1:9301"),
2875            ("s3.access.key", "hummockadmin"),
2876            ("s3.secret.key", "hummockadmin"),
2877            ("s3.path.style.access", "true"),
2878            ("s3.region", "us-east-1"),
2879            ("catalog.type", "jdbc"),
2880            ("catalog.name", "demo"),
2881            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2882            ("catalog.jdbc.user", "admin"),
2883            ("catalog.jdbc.password", "123456"),
2884            ("database.name", "demo_db"),
2885            ("table.name", "demo_table"),
2886            ("enable_compaction", "true"),
2887            ("compaction_interval_sec", "1800"),
2888            ("enable_snapshot_expiration", "true"),
2889        ]
2890        .into_iter()
2891        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2892        .collect();
2893
2894        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2895
2896        let expected_iceberg_config = IcebergConfig {
2897            common: IcebergCommon {
2898                warehouse_path: Some("s3://iceberg".to_owned()),
2899                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2900                s3_region: Some("us-east-1".to_owned()),
2901                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2902                s3_access_key: Some("hummockadmin".to_owned()),
2903                s3_secret_key: Some("hummockadmin".to_owned()),
2904                s3_iam_role_arn: None,
2905                gcs_credential: None,
2906                catalog_type: Some("jdbc".to_owned()),
2907                glue_id: None,
2908                glue_region: None,
2909                glue_access_key: None,
2910                glue_secret_key: None,
2911                glue_iam_role_arn: None,
2912                catalog_name: Some("demo".to_owned()),
2913                s3_path_style_access: Some(true),
2914                catalog_credential: None,
2915                catalog_oauth2_server_uri: None,
2916                catalog_scope: None,
2917                catalog_token: None,
2918                enable_config_load: None,
2919                rest_signing_name: None,
2920                rest_signing_region: None,
2921                rest_sigv4_enabled: None,
2922                hosted_catalog: None,
2923                azblob_account_name: None,
2924                azblob_account_key: None,
2925                azblob_endpoint_url: None,
2926                catalog_header: None,
2927                adlsgen2_account_name: None,
2928                adlsgen2_account_key: None,
2929                adlsgen2_endpoint: None,
2930                vended_credentials: None,
2931            },
2932            table: IcebergTableIdentifier {
2933                database_name: Some("demo_db".to_owned()),
2934                table_name: "demo_table".to_owned(),
2935            },
2936            r#type: "upsert".to_owned(),
2937            force_append_only: false,
2938            primary_key: Some(vec!["v1".to_owned()]),
2939            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2940            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2941                .into_iter()
2942                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2943                .collect(),
2944            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2945            create_table_if_not_exists: false,
2946            is_exactly_once: Some(true),
2947            commit_retry_num: 8,
2948            enable_compaction: true,
2949            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2950            enable_snapshot_expiration: true,
2951            write_mode: IcebergWriteMode::MergeOnRead,
2952            snapshot_expiration_max_age_millis: None,
2953            snapshot_expiration_retain_last: None,
2954            snapshot_expiration_clear_expired_files: true,
2955            snapshot_expiration_clear_expired_meta_data: true,
2956            max_snapshots_num_before_compaction: None,
2957            small_files_threshold_mb: None,
2958            delete_files_count_threshold: None,
2959            trigger_snapshot_count: None,
2960            target_file_size_mb: None,
2961            compaction_type: None,
2962        };
2963
2964        assert_eq!(iceberg_config, expected_iceberg_config);
2965
2966        assert_eq!(
2967            &iceberg_config.full_table_name().unwrap().to_string(),
2968            "demo_db.demo_table"
2969        );
2970    }
2971
2972    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2973        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2974
2975        let _table = iceberg_config.load_table().await.unwrap();
2976    }
2977
2978    #[tokio::test]
2979    #[ignore]
2980    async fn test_storage_catalog() {
2981        let values = [
2982            ("connector", "iceberg"),
2983            ("type", "append-only"),
2984            ("force_append_only", "true"),
2985            ("s3.endpoint", "http://127.0.0.1:9301"),
2986            ("s3.access.key", "hummockadmin"),
2987            ("s3.secret.key", "hummockadmin"),
2988            ("s3.region", "us-east-1"),
2989            ("s3.path.style.access", "true"),
2990            ("catalog.name", "demo"),
2991            ("catalog.type", "storage"),
2992            ("warehouse.path", "s3://icebergdata/demo"),
2993            ("database.name", "s1"),
2994            ("table.name", "t1"),
2995        ]
2996        .into_iter()
2997        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2998        .collect();
2999
3000        test_create_catalog(values).await;
3001    }
3002
3003    #[tokio::test]
3004    #[ignore]
3005    async fn test_rest_catalog() {
3006        let values = [
3007            ("connector", "iceberg"),
3008            ("type", "append-only"),
3009            ("force_append_only", "true"),
3010            ("s3.endpoint", "http://127.0.0.1:9301"),
3011            ("s3.access.key", "hummockadmin"),
3012            ("s3.secret.key", "hummockadmin"),
3013            ("s3.region", "us-east-1"),
3014            ("s3.path.style.access", "true"),
3015            ("catalog.name", "demo"),
3016            ("catalog.type", "rest"),
3017            ("catalog.uri", "http://192.168.167.4:8181"),
3018            ("warehouse.path", "s3://icebergdata/demo"),
3019            ("database.name", "s1"),
3020            ("table.name", "t1"),
3021        ]
3022        .into_iter()
3023        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3024        .collect();
3025
3026        test_create_catalog(values).await;
3027    }
3028
3029    #[tokio::test]
3030    #[ignore]
3031    async fn test_jdbc_catalog() {
3032        let values = [
3033            ("connector", "iceberg"),
3034            ("type", "append-only"),
3035            ("force_append_only", "true"),
3036            ("s3.endpoint", "http://127.0.0.1:9301"),
3037            ("s3.access.key", "hummockadmin"),
3038            ("s3.secret.key", "hummockadmin"),
3039            ("s3.region", "us-east-1"),
3040            ("s3.path.style.access", "true"),
3041            ("catalog.name", "demo"),
3042            ("catalog.type", "jdbc"),
3043            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3044            ("catalog.jdbc.user", "admin"),
3045            ("catalog.jdbc.password", "123456"),
3046            ("warehouse.path", "s3://icebergdata/demo"),
3047            ("database.name", "s1"),
3048            ("table.name", "t1"),
3049        ]
3050        .into_iter()
3051        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3052        .collect();
3053
3054        test_create_catalog(values).await;
3055    }
3056
3057    #[tokio::test]
3058    #[ignore]
3059    async fn test_hive_catalog() {
3060        let values = [
3061            ("connector", "iceberg"),
3062            ("type", "append-only"),
3063            ("force_append_only", "true"),
3064            ("s3.endpoint", "http://127.0.0.1:9301"),
3065            ("s3.access.key", "hummockadmin"),
3066            ("s3.secret.key", "hummockadmin"),
3067            ("s3.region", "us-east-1"),
3068            ("s3.path.style.access", "true"),
3069            ("catalog.name", "demo"),
3070            ("catalog.type", "hive"),
3071            ("catalog.uri", "thrift://localhost:9083"),
3072            ("warehouse.path", "s3://icebergdata/demo"),
3073            ("database.name", "s1"),
3074            ("table.name", "t1"),
3075        ]
3076        .into_iter()
3077        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3078        .collect();
3079
3080        test_create_catalog(values).await;
3081    }
3082
3083    #[test]
3084    fn test_config_constants_consistency() {
3085        // This test ensures our constants match the expected configuration names
3086        // If you change a constant, this test will remind you to update both places
3087        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3088        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3089        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3090        assert_eq!(WRITE_MODE, "write_mode");
3091        assert_eq!(
3092            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3093            "snapshot_expiration_retain_last"
3094        );
3095        assert_eq!(
3096            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3097            "snapshot_expiration_max_age_millis"
3098        );
3099        assert_eq!(
3100            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3101            "snapshot_expiration_clear_expired_files"
3102        );
3103        assert_eq!(
3104            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3105            "snapshot_expiration_clear_expired_meta_data"
3106        );
3107        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3108    }
3109
3110    #[test]
3111    fn test_parse_iceberg_compaction_config() {
3112        // Test parsing with new compaction.* prefix config names
3113        let values = [
3114            ("connector", "iceberg"),
3115            ("type", "upsert"),
3116            ("primary_key", "id"),
3117            ("warehouse.path", "s3://iceberg"),
3118            ("s3.endpoint", "http://127.0.0.1:9301"),
3119            ("s3.access.key", "test"),
3120            ("s3.secret.key", "test"),
3121            ("s3.region", "us-east-1"),
3122            ("catalog.type", "storage"),
3123            ("catalog.name", "demo"),
3124            ("database.name", "test_db"),
3125            ("table.name", "test_table"),
3126            ("enable_compaction", "true"),
3127            ("compaction.max_snapshots_num", "100"),
3128            ("compaction.small_files_threshold_mb", "512"),
3129            ("compaction.delete_files_count_threshold", "50"),
3130            ("compaction.trigger_snapshot_count", "10"),
3131            ("compaction.target_file_size_mb", "256"),
3132            ("compaction.type", "full"),
3133        ]
3134        .into_iter()
3135        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3136        .collect();
3137
3138        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3139
3140        // Verify all compaction config fields are parsed correctly
3141        assert!(iceberg_config.enable_compaction);
3142        assert_eq!(
3143            iceberg_config.max_snapshots_num_before_compaction,
3144            Some(100)
3145        );
3146        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3147        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3148        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3149        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3150        assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3151    }
3152}