risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30    DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31    SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32    UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38    DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45    PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50    DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::file::properties::WriterProperties;
58use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
59use regex::Regex;
60use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
61use risingwave_common::array::arrow::arrow_schema_iceberg::{
62    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
63    Schema as ArrowSchema, SchemaRef,
64};
65use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
66use risingwave_common::array::{Op, StreamChunk};
67use risingwave_common::bail;
68use risingwave_common::bitmap::Bitmap;
69use risingwave_common::catalog::{Field, Schema};
70use risingwave_common::error::IcebergError;
71use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
72use risingwave_common_estimate_size::EstimateSize;
73use risingwave_pb::connector_service::SinkMetadata;
74use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
75use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
76use risingwave_pb::stream_plan::PbSinkSchemaChange;
77use serde::de::{self, Deserializer, Visitor};
78use serde::{Deserialize, Serialize};
79use serde_json::from_value;
80use serde_with::{DisplayFromStr, serde_as};
81use thiserror_ext::AsReport;
82use tokio::sync::mpsc::UnboundedSender;
83use tokio_retry::RetryIf;
84use tokio_retry::strategy::{ExponentialBackoff, jitter};
85use tracing::warn;
86use url::Url;
87use uuid::Uuid;
88use with_options::WithOptions;
89
90use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
91use super::{
92    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
93    SinkError, SinkWriterParam,
94};
95use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
96use crate::enforce_secret::EnforceSecret;
97use crate::sink::catalog::SinkId;
98use crate::sink::coordinate::CoordinatedLogSinker;
99use crate::sink::writer::SinkWriter;
100use crate::sink::{
101    Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
102    TwoPhaseCommitCoordinator,
103};
104use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
105
106pub const ICEBERG_SINK: &str = "iceberg";
107
108pub const ICEBERG_COW_BRANCH: &str = "ingestion";
109pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
110pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
111pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
112pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
113pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
114
115pub const PARTITION_DATA_ID_START: i32 = 1000;
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
118#[serde(rename_all = "kebab-case")]
119pub enum IcebergWriteMode {
120    #[default]
121    MergeOnRead,
122    CopyOnWrite,
123}
124
125impl IcebergWriteMode {
126    pub fn as_str(self) -> &'static str {
127        match self {
128            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
129            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
130        }
131    }
132}
133
134impl std::str::FromStr for IcebergWriteMode {
135    type Err = SinkError;
136
137    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
138        match s {
139            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
140            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
141            _ => Err(SinkError::Config(anyhow!(format!(
142                "invalid write_mode: {}, must be one of: {}, {}",
143                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
144            )))),
145        }
146    }
147}
148
149impl TryFrom<&str> for IcebergWriteMode {
150    type Error = <Self as std::str::FromStr>::Err;
151
152    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
153        value.parse()
154    }
155}
156
157impl TryFrom<String> for IcebergWriteMode {
158    type Error = <Self as std::str::FromStr>::Err;
159
160    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
161        value.as_str().parse()
162    }
163}
164
165impl std::fmt::Display for IcebergWriteMode {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.write_str(self.as_str())
168    }
169}
170
171// Configuration constants
172pub const ENABLE_COMPACTION: &str = "enable_compaction";
173pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
174pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
175pub const WRITE_MODE: &str = "write_mode";
176pub const FORMAT_VERSION: &str = "format_version";
177pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
178pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
179pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
181    "snapshot_expiration_clear_expired_meta_data";
182pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
183
184pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
185
186pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
187
188pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
189
190pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
191
192pub const COMPACTION_TYPE: &str = "compaction.type";
193
194fn default_commit_retry_num() -> u32 {
195    8
196}
197
198fn default_iceberg_write_mode() -> IcebergWriteMode {
199    IcebergWriteMode::MergeOnRead
200}
201
202fn default_iceberg_format_version() -> FormatVersion {
203    FormatVersion::V2
204}
205
206fn default_true() -> bool {
207    true
208}
209
210fn default_some_true() -> Option<bool> {
211    Some(true)
212}
213
214fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
215    let parsed = value
216        .trim()
217        .parse::<u8>()
218        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
219    match parsed {
220        1 => Ok(FormatVersion::V1),
221        2 => Ok(FormatVersion::V2),
222        3 => Ok(FormatVersion::V3),
223        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
224    }
225}
226
227fn deserialize_format_version<'de, D>(
228    deserializer: D,
229) -> std::result::Result<FormatVersion, D::Error>
230where
231    D: Deserializer<'de>,
232{
233    struct FormatVersionVisitor;
234
235    impl<'de> Visitor<'de> for FormatVersionVisitor {
236        type Value = FormatVersion;
237
238        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239            formatter.write_str("format-version as 1, 2, or 3")
240        }
241
242        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
243        where
244            E: de::Error,
245        {
246            let value = u8::try_from(value)
247                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
248            parse_format_version_str(&value.to_string()).map_err(E::custom)
249        }
250
251        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
252        where
253            E: de::Error,
254        {
255            let value = u8::try_from(value)
256                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
257            parse_format_version_str(&value.to_string()).map_err(E::custom)
258        }
259
260        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
261        where
262            E: de::Error,
263        {
264            parse_format_version_str(value).map_err(E::custom)
265        }
266
267        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
268        where
269            E: de::Error,
270        {
271            self.visit_str(&value)
272        }
273    }
274
275    deserializer.deserialize_any(FormatVersionVisitor)
276}
277
278/// Compaction type for Iceberg sink
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
280#[serde(rename_all = "kebab-case")]
281pub enum CompactionType {
282    /// Full compaction - rewrites all data files
283    #[default]
284    Full,
285    /// Small files compaction - only compact small files
286    SmallFiles,
287    /// Files with delete compaction - only compact files that have associated delete files
288    FilesWithDelete,
289}
290
291impl CompactionType {
292    pub fn as_str(&self) -> &'static str {
293        match self {
294            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
295            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
296            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
297        }
298    }
299}
300
301impl std::str::FromStr for CompactionType {
302    type Err = SinkError;
303
304    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
305        match s {
306            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
307            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
308            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
309            _ => Err(SinkError::Config(anyhow!(format!(
310                "invalid compaction_type: {}, must be one of: {}, {}, {}",
311                s,
312                ICEBERG_COMPACTION_TYPE_FULL,
313                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
314                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
315            )))),
316        }
317    }
318}
319
320impl TryFrom<&str> for CompactionType {
321    type Error = <Self as std::str::FromStr>::Err;
322
323    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
324        value.parse()
325    }
326}
327
328impl TryFrom<String> for CompactionType {
329    type Error = <Self as std::str::FromStr>::Err;
330
331    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
332        value.as_str().parse()
333    }
334}
335
336impl std::fmt::Display for CompactionType {
337    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338        write!(f, "{}", self.as_str())
339    }
340}
341
342#[serde_as]
343#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
344pub struct IcebergConfig {
345    pub r#type: String, // accept "append-only" or "upsert"
346
347    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
348    pub force_append_only: bool,
349
350    #[serde(flatten)]
351    common: IcebergCommon,
352
353    #[serde(flatten)]
354    table: IcebergTableIdentifier,
355
356    #[serde(
357        rename = "primary_key",
358        default,
359        deserialize_with = "deserialize_optional_string_seq_from_string"
360    )]
361    pub primary_key: Option<Vec<String>>,
362
363    // Props for java catalog props.
364    #[serde(skip)]
365    pub java_catalog_props: HashMap<String, String>,
366
367    #[serde(default)]
368    pub partition_by: Option<String>,
369
370    /// Commit every n(>0) checkpoints, default is 60.
371    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
372    #[serde_as(as = "DisplayFromStr")]
373    #[with_option(allow_alter_on_fly)]
374    pub commit_checkpoint_interval: u64,
375
376    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
377    pub create_table_if_not_exists: bool,
378
379    /// Whether it is `exactly_once`, the default is true.
380    #[serde(default = "default_some_true")]
381    #[serde_as(as = "Option<DisplayFromStr>")]
382    pub is_exactly_once: Option<bool>,
383    // Retry commit num when iceberg commit fail. default is 8.
384    // # TODO
385    // Iceberg table may store the retry commit num in table meta.
386    // We should try to find and use that as default commit retry num first.
387    #[serde(default = "default_commit_retry_num")]
388    pub commit_retry_num: u32,
389
390    /// Whether to enable iceberg compaction.
391    #[serde(
392        rename = "enable_compaction",
393        default,
394        deserialize_with = "deserialize_bool_from_string"
395    )]
396    #[with_option(allow_alter_on_fly)]
397    pub enable_compaction: bool,
398
399    /// The interval of iceberg compaction
400    #[serde(rename = "compaction_interval_sec", default)]
401    #[serde_as(as = "Option<DisplayFromStr>")]
402    #[with_option(allow_alter_on_fly)]
403    pub compaction_interval_sec: Option<u64>,
404
405    /// Whether to enable iceberg expired snapshots.
406    #[serde(
407        rename = "enable_snapshot_expiration",
408        default,
409        deserialize_with = "deserialize_bool_from_string"
410    )]
411    #[with_option(allow_alter_on_fly)]
412    pub enable_snapshot_expiration: bool,
413
414    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
415    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
416    pub write_mode: IcebergWriteMode,
417
418    /// Iceberg format version for table creation.
419    #[serde(
420        rename = "format_version",
421        default = "default_iceberg_format_version",
422        deserialize_with = "deserialize_format_version"
423    )]
424    pub format_version: FormatVersion,
425
426    /// The maximum age (in milliseconds) for snapshots before they expire
427    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
428    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
429    #[serde_as(as = "Option<DisplayFromStr>")]
430    #[with_option(allow_alter_on_fly)]
431    pub snapshot_expiration_max_age_millis: Option<i64>,
432
433    /// The number of snapshots to retain
434    #[serde(rename = "snapshot_expiration_retain_last", default)]
435    #[serde_as(as = "Option<DisplayFromStr>")]
436    #[with_option(allow_alter_on_fly)]
437    pub snapshot_expiration_retain_last: Option<i32>,
438
439    #[serde(
440        rename = "snapshot_expiration_clear_expired_files",
441        default = "default_true",
442        deserialize_with = "deserialize_bool_from_string"
443    )]
444    #[with_option(allow_alter_on_fly)]
445    pub snapshot_expiration_clear_expired_files: bool,
446
447    #[serde(
448        rename = "snapshot_expiration_clear_expired_meta_data",
449        default = "default_true",
450        deserialize_with = "deserialize_bool_from_string"
451    )]
452    #[with_option(allow_alter_on_fly)]
453    pub snapshot_expiration_clear_expired_meta_data: bool,
454
455    /// The maximum number of snapshots allowed since the last rewrite operation
456    /// If set, sink will check snapshot count and wait if exceeded
457    #[serde(rename = "compaction.max_snapshots_num", default)]
458    #[serde_as(as = "Option<DisplayFromStr>")]
459    #[with_option(allow_alter_on_fly)]
460    pub max_snapshots_num_before_compaction: Option<usize>,
461
462    #[serde(rename = "compaction.small_files_threshold_mb", default)]
463    #[serde_as(as = "Option<DisplayFromStr>")]
464    #[with_option(allow_alter_on_fly)]
465    pub small_files_threshold_mb: Option<u64>,
466
467    #[serde(rename = "compaction.delete_files_count_threshold", default)]
468    #[serde_as(as = "Option<DisplayFromStr>")]
469    #[with_option(allow_alter_on_fly)]
470    pub delete_files_count_threshold: Option<usize>,
471
472    #[serde(rename = "compaction.trigger_snapshot_count", default)]
473    #[serde_as(as = "Option<DisplayFromStr>")]
474    #[with_option(allow_alter_on_fly)]
475    pub trigger_snapshot_count: Option<usize>,
476
477    #[serde(rename = "compaction.target_file_size_mb", default)]
478    #[serde_as(as = "Option<DisplayFromStr>")]
479    #[with_option(allow_alter_on_fly)]
480    pub target_file_size_mb: Option<u64>,
481
482    /// Compaction type: `full`, `small-files`, or `files-with-delete`
483    /// If not set, will default to `full`
484    #[serde(rename = "compaction.type", default)]
485    #[with_option(allow_alter_on_fly)]
486    pub compaction_type: Option<CompactionType>,
487}
488
489impl EnforceSecret for IcebergConfig {
490    fn enforce_secret<'a>(
491        prop_iter: impl Iterator<Item = &'a str>,
492    ) -> crate::error::ConnectorResult<()> {
493        for prop in prop_iter {
494            IcebergCommon::enforce_one(prop)?;
495        }
496        Ok(())
497    }
498
499    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
500        IcebergCommon::enforce_one(prop)
501    }
502}
503
504impl IcebergConfig {
505    /// Validate that append-only sinks use merge-on-read mode
506    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
507    fn validate_append_only_write_mode(
508        sink_type: &str,
509        write_mode: IcebergWriteMode,
510    ) -> Result<()> {
511        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
512            return Err(SinkError::Config(anyhow!(
513                "'copy-on-write' mode is not supported for append-only iceberg sink. \
514                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
515            )));
516        }
517        Ok(())
518    }
519
520    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
521        let mut config =
522            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
523                .map_err(|e| SinkError::Config(anyhow!(e)))?;
524
525        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
526            return Err(SinkError::Config(anyhow!(
527                "`{}` must be {}, or {}",
528                SINK_TYPE_OPTION,
529                SINK_TYPE_APPEND_ONLY,
530                SINK_TYPE_UPSERT
531            )));
532        }
533
534        if config.r#type == SINK_TYPE_UPSERT {
535            if let Some(primary_key) = &config.primary_key {
536                if primary_key.is_empty() {
537                    return Err(SinkError::Config(anyhow!(
538                        "`primary-key` must not be empty in {}",
539                        SINK_TYPE_UPSERT
540                    )));
541                }
542            } else {
543                return Err(SinkError::Config(anyhow!(
544                    "Must set `primary-key` in {}",
545                    SINK_TYPE_UPSERT
546                )));
547            }
548        }
549
550        // Enforce merge-on-read for append-only sinks
551        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
552
553        // All configs start with "catalog." will be treated as java configs.
554        config.java_catalog_props = values
555            .iter()
556            .filter(|(k, _v)| {
557                k.starts_with("catalog.")
558                    && k != &"catalog.uri"
559                    && k != &"catalog.type"
560                    && k != &"catalog.name"
561                    && k != &"catalog.header"
562            })
563            .map(|(k, v)| (k[8..].to_string(), v.clone()))
564            .collect();
565
566        if config.commit_checkpoint_interval == 0 {
567            return Err(SinkError::Config(anyhow!(
568                "`commit-checkpoint-interval` must be greater than 0"
569            )));
570        }
571
572        Ok(config)
573    }
574
575    pub fn catalog_type(&self) -> &str {
576        self.common.catalog_type()
577    }
578
579    pub async fn load_table(&self) -> Result<Table> {
580        self.common
581            .load_table(&self.table, &self.java_catalog_props)
582            .await
583            .map_err(Into::into)
584    }
585
586    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
587        self.common
588            .create_catalog(&self.java_catalog_props)
589            .await
590            .map_err(Into::into)
591    }
592
593    pub fn full_table_name(&self) -> Result<TableIdent> {
594        self.table.to_table_ident().map_err(Into::into)
595    }
596
597    pub fn catalog_name(&self) -> String {
598        self.common.catalog_name()
599    }
600
601    pub fn table_format_version(&self) -> FormatVersion {
602        self.format_version
603    }
604
605    pub fn compaction_interval_sec(&self) -> u64 {
606        // default to 1 hour
607        self.compaction_interval_sec.unwrap_or(3600)
608    }
609
610    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
611    /// Returns `current_time_ms` - `max_age_millis`
612    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
613        self.snapshot_expiration_max_age_millis
614            .map(|max_age_millis| current_time_ms - max_age_millis)
615    }
616
617    pub fn trigger_snapshot_count(&self) -> usize {
618        self.trigger_snapshot_count.unwrap_or(usize::MAX)
619    }
620
621    pub fn small_files_threshold_mb(&self) -> u64 {
622        self.small_files_threshold_mb.unwrap_or(64)
623    }
624
625    pub fn delete_files_count_threshold(&self) -> usize {
626        self.delete_files_count_threshold.unwrap_or(256)
627    }
628
629    pub fn target_file_size_mb(&self) -> u64 {
630        self.target_file_size_mb.unwrap_or(1024)
631    }
632
633    /// Get the compaction type as an enum
634    /// This method parses the string and returns the enum value
635    pub fn compaction_type(&self) -> CompactionType {
636        self.compaction_type.unwrap_or_default()
637    }
638}
639
640pub struct IcebergSink {
641    pub config: IcebergConfig,
642    param: SinkParam,
643    // In upsert mode, it never be None and empty.
644    unique_column_ids: Option<Vec<usize>>,
645}
646
647impl EnforceSecret for IcebergSink {
648    fn enforce_secret<'a>(
649        prop_iter: impl Iterator<Item = &'a str>,
650    ) -> crate::error::ConnectorResult<()> {
651        for prop in prop_iter {
652            IcebergConfig::enforce_one(prop)?;
653        }
654        Ok(())
655    }
656}
657
658impl TryFrom<SinkParam> for IcebergSink {
659    type Error = SinkError;
660
661    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
662        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
663        IcebergSink::new(config, param)
664    }
665}
666
667impl Debug for IcebergSink {
668    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669        f.debug_struct("IcebergSink")
670            .field("config", &self.config)
671            .finish()
672    }
673}
674
675async fn create_and_validate_table_impl(
676    config: &IcebergConfig,
677    param: &SinkParam,
678) -> Result<Table> {
679    if config.create_table_if_not_exists {
680        create_table_if_not_exists_impl(config, param).await?;
681    }
682
683    let table = config
684        .load_table()
685        .await
686        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
687
688    let sink_schema = param.schema();
689    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
690        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
691
692    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
693        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
694
695    Ok(table)
696}
697
698async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
699    let catalog = config.create_catalog().await?;
700    let namespace = if let Some(database_name) = config.table.database_name() {
701        let namespace = NamespaceIdent::new(database_name.to_owned());
702        if !catalog
703            .namespace_exists(&namespace)
704            .await
705            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
706        {
707            catalog
708                .create_namespace(&namespace, HashMap::default())
709                .await
710                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
711                .context("failed to create iceberg namespace")?;
712        }
713        namespace
714    } else {
715        bail!("database name must be set if you want to create table")
716    };
717
718    let table_id = config
719        .full_table_name()
720        .context("Unable to parse table name")?;
721    if !catalog
722        .table_exists(&table_id)
723        .await
724        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
725    {
726        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
727        // convert risingwave schema -> arrow schema -> iceberg schema
728        let arrow_fields = param
729            .columns
730            .iter()
731            .map(|column| {
732                Ok(iceberg_create_table_arrow_convert
733                    .to_arrow_field(&column.name, &column.data_type)
734                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
735                    .context(format!(
736                        "failed to convert {}: {} to arrow type",
737                        &column.name, &column.data_type
738                    ))?)
739            })
740            .collect::<Result<Vec<ArrowField>>>()?;
741        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
742        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
743            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
744            .context("failed to convert arrow schema to iceberg schema")?;
745
746        let location = {
747            let mut names = namespace.clone().inner();
748            names.push(config.table.table_name().to_owned());
749            match &config.common.warehouse_path {
750                Some(warehouse_path) => {
751                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
752                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
753                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
754                    let url = Url::parse(warehouse_path);
755                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
756                        // For rest catalog, the warehouse_path could be a warehouse name.
757                        // In this case, we should specify the location when creating a table.
758                        if config.common.catalog_type() == "rest"
759                            || config.common.catalog_type() == "rest_rust"
760                        {
761                            None
762                        } else {
763                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
764                        }
765                    } else if warehouse_path.ends_with('/') {
766                        Some(format!("{}{}", warehouse_path, names.join("/")))
767                    } else {
768                        Some(format!("{}/{}", warehouse_path, names.join("/")))
769                    }
770                }
771                None => None,
772            }
773        };
774
775        let partition_spec = match &config.partition_by {
776            Some(partition_by) => {
777                let mut partition_fields = Vec::<UnboundPartitionField>::new();
778                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
779                    .into_iter()
780                    .enumerate()
781                {
782                    match iceberg_schema.field_id_by_name(&column) {
783                        Some(id) => partition_fields.push(
784                            UnboundPartitionField::builder()
785                                .source_id(id)
786                                .transform(transform)
787                                .name(format!("_p_{}", column))
788                                .field_id(PARTITION_DATA_ID_START + i as i32)
789                                .build(),
790                        ),
791                        None => bail!(format!(
792                            "Partition source column does not exist in schema: {}",
793                            column
794                        )),
795                    };
796                }
797                Some(
798                    UnboundPartitionSpec::builder()
799                        .with_spec_id(0)
800                        .add_partition_fields(partition_fields)
801                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
802                        .context("failed to add partition columns")?
803                        .build(),
804                )
805            }
806            None => None,
807        };
808
809        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
810        let properties = HashMap::from([(
811            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
812            (config.format_version as u8).to_string(),
813        )]);
814
815        let table_creation_builder = TableCreation::builder()
816            .name(config.table.table_name().to_owned())
817            .schema(iceberg_schema)
818            .format_version(config.table_format_version())
819            .properties(properties);
820
821        let table_creation = match (location, partition_spec) {
822            (Some(location), Some(partition_spec)) => table_creation_builder
823                .location(location)
824                .partition_spec(partition_spec)
825                .build(),
826            (Some(location), None) => table_creation_builder.location(location).build(),
827            (None, Some(partition_spec)) => table_creation_builder
828                .partition_spec(partition_spec)
829                .build(),
830            (None, None) => table_creation_builder.build(),
831        };
832
833        catalog
834            .create_table(&namespace, table_creation)
835            .await
836            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
837            .context("failed to create iceberg table")?;
838    }
839    Ok(())
840}
841
842impl IcebergSink {
843    pub async fn create_and_validate_table(&self) -> Result<Table> {
844        create_and_validate_table_impl(&self.config, &self.param).await
845    }
846
847    pub async fn create_table_if_not_exists(&self) -> Result<()> {
848        create_table_if_not_exists_impl(&self.config, &self.param).await
849    }
850
851    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
852        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
853            if let Some(pk) = &config.primary_key {
854                let mut unique_column_ids = Vec::with_capacity(pk.len());
855                for col_name in pk {
856                    let id = param
857                        .columns
858                        .iter()
859                        .find(|col| col.name.as_str() == col_name)
860                        .ok_or_else(|| {
861                            SinkError::Config(anyhow!(
862                                "Primary key column {} not found in sink schema",
863                                col_name
864                            ))
865                        })?
866                        .column_id
867                        .get_id() as usize;
868                    unique_column_ids.push(id);
869                }
870                Some(unique_column_ids)
871            } else {
872                unreachable!()
873            }
874        } else {
875            None
876        };
877        Ok(Self {
878            config,
879            param,
880            unique_column_ids,
881        })
882    }
883}
884
885impl Sink for IcebergSink {
886    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
887
888    const SINK_NAME: &'static str = ICEBERG_SINK;
889
890    async fn validate(&self) -> Result<()> {
891        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
892            bail!("Snowflake catalog only supports iceberg sources");
893        }
894
895        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
896            risingwave_common::license::Feature::IcebergSinkWithGlue
897                .check_available()
898                .map_err(|e| anyhow::anyhow!(e))?;
899        }
900
901        // Enforce merge-on-read for append-only tables
902        IcebergConfig::validate_append_only_write_mode(
903            &self.config.r#type,
904            self.config.write_mode,
905        )?;
906
907        // Validate compaction type configuration
908        let compaction_type = self.config.compaction_type();
909
910        // Check COW mode constraints
911        // COW mode only supports 'full' compaction type
912        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
913            && compaction_type != CompactionType::Full
914        {
915            bail!(
916                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
917                compaction_type
918            );
919        }
920
921        match compaction_type {
922            CompactionType::SmallFiles => {
923                // 1. check license
924                risingwave_common::license::Feature::IcebergCompaction
925                    .check_available()
926                    .map_err(|e| anyhow::anyhow!(e))?;
927
928                // 2. check write mode
929                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
930                    bail!(
931                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
932                        self.config.write_mode
933                    );
934                }
935
936                // 3. check conflicting parameters
937                if self.config.delete_files_count_threshold.is_some() {
938                    bail!(
939                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
940                    );
941                }
942            }
943            CompactionType::FilesWithDelete => {
944                // 1. check license
945                risingwave_common::license::Feature::IcebergCompaction
946                    .check_available()
947                    .map_err(|e| anyhow::anyhow!(e))?;
948
949                // 2. check write mode
950                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
951                    bail!(
952                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
953                        self.config.write_mode
954                    );
955                }
956
957                // 3. check conflicting parameters
958                if self.config.small_files_threshold_mb.is_some() {
959                    bail!(
960                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
961                    );
962                }
963            }
964            CompactionType::Full => {
965                // Full compaction has no special requirements
966            }
967        }
968
969        let _ = self.create_and_validate_table().await?;
970        Ok(())
971    }
972
973    fn support_schema_change() -> bool {
974        true
975    }
976
977    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
978        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
979
980        // Validate compaction interval
981        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
982            if iceberg_config.enable_compaction && compaction_interval == 0 {
983                bail!(
984                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
985                );
986            }
987
988            // log compaction_interval
989            tracing::info!(
990                "Alter config compaction_interval set to {} seconds",
991                compaction_interval
992            );
993        }
994
995        // Validate max snapshots
996        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
997            && max_snapshots < 1
998        {
999            bail!(
1000                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1001                max_snapshots
1002            );
1003        }
1004
1005        Ok(())
1006    }
1007
1008    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1009        let writer = IcebergSinkWriter::new(
1010            self.config.clone(),
1011            self.param.clone(),
1012            writer_param.clone(),
1013            self.unique_column_ids.clone(),
1014        );
1015
1016        let commit_checkpoint_interval =
1017            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1018                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1019            );
1020        let log_sinker = CoordinatedLogSinker::new(
1021            &writer_param,
1022            self.param.clone(),
1023            writer,
1024            commit_checkpoint_interval,
1025        )
1026        .await?;
1027
1028        Ok(log_sinker)
1029    }
1030
1031    fn is_coordinated_sink(&self) -> bool {
1032        true
1033    }
1034
1035    async fn new_coordinator(
1036        &self,
1037        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1038    ) -> Result<SinkCommitCoordinator> {
1039        let catalog = self.config.create_catalog().await?;
1040        let table = self.create_and_validate_table().await?;
1041        let coordinator = IcebergSinkCommitter {
1042            catalog,
1043            table,
1044            last_commit_epoch: 0,
1045            sink_id: self.param.sink_id,
1046            config: self.config.clone(),
1047            param: self.param.clone(),
1048            commit_retry_num: self.config.commit_retry_num,
1049            iceberg_compact_stat_sender,
1050        };
1051        if self.config.is_exactly_once.unwrap_or_default() {
1052            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1053        } else {
1054            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1055        }
1056    }
1057}
1058
1059/// None means no project.
1060/// Prepare represent the extra partition column idx.
1061/// Done represents the project idx vec.
1062///
1063/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
1064/// to create the project idx vec.
1065enum ProjectIdxVec {
1066    None,
1067    Prepare(usize),
1068    Done(Vec<usize>),
1069}
1070
1071type DataFileWriterBuilderType =
1072    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1073type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1074    ParquetWriterBuilder,
1075    DefaultLocationGenerator,
1076    DefaultFileNameGenerator,
1077>;
1078type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1079    ParquetWriterBuilder,
1080    DefaultLocationGenerator,
1081    DefaultFileNameGenerator,
1082>;
1083type DeletionVectorWriterBuilderType =
1084    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1085type DeletionVectorWriterType =
1086    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1087type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1088    ParquetWriterBuilder,
1089    DefaultLocationGenerator,
1090    DefaultFileNameGenerator,
1091>;
1092
1093#[derive(Clone)]
1094enum PositionDeleteWriterBuilderType {
1095    PositionDelete(PositionDeleteFileWriterBuilderType),
1096    DeletionVector(DeletionVectorWriterBuilderType),
1097}
1098
1099enum PositionDeleteWriterType {
1100    PositionDelete(PositionDeleteFileWriterType),
1101    DeletionVector(DeletionVectorWriterType),
1102}
1103
1104#[async_trait]
1105impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1106    type R = PositionDeleteWriterType;
1107
1108    async fn build(
1109        self,
1110        partition_key: Option<iceberg::spec::PartitionKey>,
1111    ) -> iceberg::Result<Self::R> {
1112        match self {
1113            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1114                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1115            ),
1116            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1117                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1118            ),
1119        }
1120    }
1121}
1122
1123#[async_trait]
1124impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1125    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1126        match self {
1127            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1128            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1129        }
1130    }
1131
1132    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1133        match self {
1134            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1135            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1136        }
1137    }
1138}
1139
1140#[derive(Clone)]
1141struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1142    inner: B,
1143    fanout_enabled: bool,
1144    schema: IcebergSchemaRef,
1145    partition_spec: PartitionSpecRef,
1146    compute_partition: bool,
1147}
1148
1149impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1150    fn new(
1151        inner: B,
1152        fanout_enabled: bool,
1153        schema: IcebergSchemaRef,
1154        partition_spec: PartitionSpecRef,
1155        compute_partition: bool,
1156    ) -> Self {
1157        Self {
1158            inner,
1159            fanout_enabled,
1160            schema,
1161            partition_spec,
1162            compute_partition,
1163        }
1164    }
1165
1166    fn build(self) -> iceberg::Result<TaskWriter<B>> {
1167        let partition_splitter = match (
1168            self.partition_spec.is_unpartitioned(),
1169            self.compute_partition,
1170        ) {
1171            (true, _) => None,
1172            (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
1173                self.schema.clone(),
1174                self.partition_spec.clone(),
1175            )?),
1176            (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1177                self.schema.clone(),
1178                self.partition_spec.clone(),
1179            )?),
1180        };
1181
1182        Ok(TaskWriter::new_with_partition_splitter(
1183            self.inner,
1184            self.fanout_enabled,
1185            self.schema,
1186            self.partition_spec,
1187            partition_splitter,
1188        ))
1189    }
1190}
1191
1192pub enum IcebergSinkWriter {
1193    Created(IcebergSinkWriterArgs),
1194    Initialized(IcebergSinkWriterInner),
1195}
1196
1197pub struct IcebergSinkWriterArgs {
1198    config: IcebergConfig,
1199    sink_param: SinkParam,
1200    writer_param: SinkWriterParam,
1201    unique_column_ids: Option<Vec<usize>>,
1202}
1203
1204pub struct IcebergSinkWriterInner {
1205    writer: IcebergWriterDispatch,
1206    arrow_schema: SchemaRef,
1207    // See comments below
1208    metrics: IcebergWriterMetrics,
1209    // State of iceberg table for this writer
1210    table: Table,
1211    // For chunk with extra partition column, we should remove this column before write.
1212    // This project index vec is used to avoid create project idx each time.
1213    project_idx_vec: ProjectIdxVec,
1214}
1215
1216#[allow(clippy::type_complexity)]
1217enum IcebergWriterDispatch {
1218    Append {
1219        writer: Option<Box<dyn IcebergWriter>>,
1220        writer_builder:
1221            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1222    },
1223    Upsert {
1224        writer: Option<Box<dyn IcebergWriter>>,
1225        writer_builder: TaskWriterBuilderWrapper<
1226            MonitoredGeneralWriterBuilder<
1227                DeltaWriterBuilder<
1228                    DataFileWriterBuilderType,
1229                    PositionDeleteWriterBuilderType,
1230                    EqualityDeleteFileWriterBuilderType,
1231                >,
1232            >,
1233        >,
1234        arrow_schema_with_op_column: SchemaRef,
1235    },
1236}
1237
1238impl IcebergWriterDispatch {
1239    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1240        match self {
1241            IcebergWriterDispatch::Append { writer, .. }
1242            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1243        }
1244    }
1245}
1246
1247pub struct IcebergWriterMetrics {
1248    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
1249    // They are actually used in `PrometheusWriterBuilder`:
1250    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
1251    // We keep them here to let the guard cleans the labels from metrics registry when dropped
1252    _write_qps: LabelGuardedIntCounter,
1253    _write_latency: LabelGuardedHistogram,
1254    write_bytes: LabelGuardedIntCounter,
1255}
1256
1257impl IcebergSinkWriter {
1258    pub fn new(
1259        config: IcebergConfig,
1260        sink_param: SinkParam,
1261        writer_param: SinkWriterParam,
1262        unique_column_ids: Option<Vec<usize>>,
1263    ) -> Self {
1264        Self::Created(IcebergSinkWriterArgs {
1265            config,
1266            sink_param,
1267            writer_param,
1268            unique_column_ids,
1269        })
1270    }
1271}
1272
1273impl IcebergSinkWriterInner {
1274    fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1275        let SinkWriterParam {
1276            extra_partition_col_idx,
1277            actor_id,
1278            sink_id,
1279            sink_name,
1280            ..
1281        } = writer_param;
1282        let metrics_labels = [
1283            &actor_id.to_string(),
1284            &sink_id.to_string(),
1285            sink_name.as_str(),
1286        ];
1287
1288        // Metrics
1289        let write_qps = GLOBAL_SINK_METRICS
1290            .iceberg_write_qps
1291            .with_guarded_label_values(&metrics_labels);
1292        let write_latency = GLOBAL_SINK_METRICS
1293            .iceberg_write_latency
1294            .with_guarded_label_values(&metrics_labels);
1295        // # TODO
1296        // Unused. Add this metrics later.
1297        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1298            .iceberg_rolling_unflushed_data_file
1299            .with_guarded_label_values(&metrics_labels);
1300        let write_bytes = GLOBAL_SINK_METRICS
1301            .iceberg_write_bytes
1302            .with_guarded_label_values(&metrics_labels);
1303
1304        let schema = table.metadata().current_schema();
1305        let partition_spec = table.metadata().default_partition_spec();
1306        let fanout_enabled = !partition_spec.fields().is_empty();
1307        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1308        let unique_uuid_suffix = Uuid::now_v7();
1309
1310        let parquet_writer_properties = WriterProperties::builder()
1311            .set_max_row_group_size(
1312                writer_param
1313                    .streaming_config
1314                    .developer
1315                    .iceberg_sink_write_parquet_max_row_group_rows,
1316            )
1317            .build();
1318
1319        let parquet_writer_builder =
1320            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1321        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1322            parquet_writer_builder,
1323            table.file_io().clone(),
1324            DefaultLocationGenerator::new(table.metadata().clone())
1325                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1326            DefaultFileNameGenerator::new(
1327                writer_param.actor_id.to_string(),
1328                Some(unique_uuid_suffix.to_string()),
1329                iceberg::spec::DataFileFormat::Parquet,
1330            ),
1331        );
1332        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1333        let monitored_builder = MonitoredGeneralWriterBuilder::new(
1334            data_file_builder,
1335            write_qps.clone(),
1336            write_latency.clone(),
1337        );
1338        let writer_builder = TaskWriterBuilderWrapper::new(
1339            monitored_builder,
1340            fanout_enabled,
1341            schema.clone(),
1342            partition_spec.clone(),
1343            true,
1344        );
1345        let inner_writer = Some(Box::new(
1346            writer_builder
1347                .clone()
1348                .build()
1349                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1350        ) as Box<dyn IcebergWriter>);
1351        Ok(Self {
1352            arrow_schema: Arc::new(
1353                schema_to_arrow_schema(table.metadata().current_schema())
1354                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1355            ),
1356            metrics: IcebergWriterMetrics {
1357                _write_qps: write_qps,
1358                _write_latency: write_latency,
1359                write_bytes,
1360            },
1361            writer: IcebergWriterDispatch::Append {
1362                writer: inner_writer,
1363                writer_builder,
1364            },
1365            table,
1366            project_idx_vec: {
1367                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1368                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1369                } else {
1370                    ProjectIdxVec::None
1371                }
1372            },
1373        })
1374    }
1375
1376    fn build_upsert(
1377        table: Table,
1378        unique_column_ids: Vec<usize>,
1379        writer_param: &SinkWriterParam,
1380    ) -> Result<Self> {
1381        let SinkWriterParam {
1382            extra_partition_col_idx,
1383            actor_id,
1384            sink_id,
1385            sink_name,
1386            ..
1387        } = writer_param;
1388        let metrics_labels = [
1389            &actor_id.to_string(),
1390            &sink_id.to_string(),
1391            sink_name.as_str(),
1392        ];
1393        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1394
1395        // Metrics
1396        let write_qps = GLOBAL_SINK_METRICS
1397            .iceberg_write_qps
1398            .with_guarded_label_values(&metrics_labels);
1399        let write_latency = GLOBAL_SINK_METRICS
1400            .iceberg_write_latency
1401            .with_guarded_label_values(&metrics_labels);
1402        // # TODO
1403        // Unused. Add this metrics later.
1404        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1405            .iceberg_rolling_unflushed_data_file
1406            .with_guarded_label_values(&metrics_labels);
1407        let write_bytes = GLOBAL_SINK_METRICS
1408            .iceberg_write_bytes
1409            .with_guarded_label_values(&metrics_labels);
1410
1411        // Determine the schema id and partition spec id
1412        let schema = table.metadata().current_schema();
1413        let partition_spec = table.metadata().default_partition_spec();
1414        let fanout_enabled = !partition_spec.fields().is_empty();
1415        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1416
1417        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1418        let unique_uuid_suffix = Uuid::now_v7();
1419
1420        let parquet_writer_properties = WriterProperties::builder()
1421            .set_max_row_group_size(
1422                writer_param
1423                    .streaming_config
1424                    .developer
1425                    .iceberg_sink_write_parquet_max_row_group_rows,
1426            )
1427            .build();
1428
1429        let data_file_builder = {
1430            let parquet_writer_builder =
1431                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1432            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1433                parquet_writer_builder,
1434                table.file_io().clone(),
1435                DefaultLocationGenerator::new(table.metadata().clone())
1436                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1437                DefaultFileNameGenerator::new(
1438                    writer_param.actor_id.to_string(),
1439                    Some(unique_uuid_suffix.to_string()),
1440                    iceberg::spec::DataFileFormat::Parquet,
1441                ),
1442            );
1443            DataFileWriterBuilder::new(rolling_writer_builder)
1444        };
1445        let position_delete_builder = if use_deletion_vectors {
1446            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1447                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1448            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1449                table.file_io().clone(),
1450                location_generator,
1451                DefaultFileNameGenerator::new(
1452                    writer_param.actor_id.to_string(),
1453                    Some(format!("delvec-{}", unique_uuid_suffix)),
1454                    iceberg::spec::DataFileFormat::Puffin,
1455                ),
1456            ))
1457        } else {
1458            let parquet_writer_builder = ParquetWriterBuilder::new(
1459                parquet_writer_properties.clone(),
1460                POSITION_DELETE_SCHEMA.clone().into(),
1461            );
1462            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1463                parquet_writer_builder,
1464                table.file_io().clone(),
1465                DefaultLocationGenerator::new(table.metadata().clone())
1466                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1467                DefaultFileNameGenerator::new(
1468                    writer_param.actor_id.to_string(),
1469                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1470                    iceberg::spec::DataFileFormat::Parquet,
1471                ),
1472            );
1473            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1474                rolling_writer_builder,
1475            ))
1476        };
1477        let equality_delete_builder = {
1478            let config = EqualityDeleteWriterConfig::new(
1479                unique_column_ids.clone(),
1480                table.metadata().current_schema().clone(),
1481            )
1482            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1483            let parquet_writer_builder = ParquetWriterBuilder::new(
1484                parquet_writer_properties,
1485                Arc::new(
1486                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1487                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1488                ),
1489            );
1490            let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1491                parquet_writer_builder,
1492                table.file_io().clone(),
1493                DefaultLocationGenerator::new(table.metadata().clone())
1494                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1495                DefaultFileNameGenerator::new(
1496                    writer_param.actor_id.to_string(),
1497                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1498                    iceberg::spec::DataFileFormat::Parquet,
1499                ),
1500            );
1501
1502            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1503        };
1504        let delta_builder = DeltaWriterBuilder::new(
1505            data_file_builder,
1506            position_delete_builder,
1507            equality_delete_builder,
1508            unique_column_ids,
1509            schema.clone(),
1510        );
1511        let original_arrow_schema = Arc::new(
1512            schema_to_arrow_schema(table.metadata().current_schema())
1513                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1514        );
1515        let schema_with_extra_op_column = {
1516            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1517            new_fields.push(Arc::new(ArrowField::new(
1518                "op".to_owned(),
1519                ArrowDataType::Int32,
1520                false,
1521            )));
1522            Arc::new(ArrowSchema::new(new_fields))
1523        };
1524        let writer_builder = TaskWriterBuilderWrapper::new(
1525            MonitoredGeneralWriterBuilder::new(
1526                delta_builder,
1527                write_qps.clone(),
1528                write_latency.clone(),
1529            ),
1530            fanout_enabled,
1531            schema.clone(),
1532            partition_spec.clone(),
1533            true,
1534        );
1535        let inner_writer = Some(Box::new(
1536            writer_builder
1537                .clone()
1538                .build()
1539                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1540        ) as Box<dyn IcebergWriter>);
1541        Ok(Self {
1542            arrow_schema: original_arrow_schema,
1543            metrics: IcebergWriterMetrics {
1544                _write_qps: write_qps,
1545                _write_latency: write_latency,
1546                write_bytes,
1547            },
1548            table,
1549            writer: IcebergWriterDispatch::Upsert {
1550                writer: inner_writer,
1551                writer_builder,
1552                arrow_schema_with_op_column: schema_with_extra_op_column,
1553            },
1554            project_idx_vec: {
1555                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1556                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
1557                } else {
1558                    ProjectIdxVec::None
1559                }
1560            },
1561        })
1562    }
1563}
1564
1565#[async_trait]
1566impl SinkWriter for IcebergSinkWriter {
1567    type CommitMetadata = Option<SinkMetadata>;
1568
1569    /// Begin a new epoch
1570    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1571        let Self::Created(args) = self else {
1572            return Ok(());
1573        };
1574
1575        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1576        let inner = match &args.unique_column_ids {
1577            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1578                table,
1579                unique_column_ids.clone(),
1580                &args.writer_param,
1581            )?,
1582            None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1583        };
1584
1585        *self = IcebergSinkWriter::Initialized(inner);
1586        Ok(())
1587    }
1588
1589    /// Write a stream chunk to sink
1590    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1591        let Self::Initialized(inner) = self else {
1592            unreachable!("IcebergSinkWriter should be initialized before barrier");
1593        };
1594
1595        // Try to build writer if it's None.
1596        match &mut inner.writer {
1597            IcebergWriterDispatch::Append {
1598                writer,
1599                writer_builder,
1600            } => {
1601                if writer.is_none() {
1602                    *writer = Some(Box::new(
1603                        writer_builder
1604                            .clone()
1605                            .build()
1606                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1607                    ));
1608                }
1609            }
1610            IcebergWriterDispatch::Upsert {
1611                writer,
1612                writer_builder,
1613                ..
1614            } => {
1615                if writer.is_none() {
1616                    *writer = Some(Box::new(
1617                        writer_builder
1618                            .clone()
1619                            .build()
1620                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1621                    ));
1622                }
1623            }
1624        };
1625
1626        // Process the chunk.
1627        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1628        match &mut inner.project_idx_vec {
1629            ProjectIdxVec::None => {}
1630            ProjectIdxVec::Prepare(idx) => {
1631                if *idx >= chunk.columns().len() {
1632                    return Err(SinkError::Iceberg(anyhow!(
1633                        "invalid extra partition column index {}",
1634                        idx
1635                    )));
1636                }
1637                let project_idx_vec = (0..*idx)
1638                    .chain(*idx + 1..chunk.columns().len())
1639                    .collect_vec();
1640                chunk = chunk.project(&project_idx_vec);
1641                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1642            }
1643            ProjectIdxVec::Done(idx_vec) => {
1644                chunk = chunk.project(idx_vec);
1645            }
1646        }
1647        if ops.is_empty() {
1648            return Ok(());
1649        }
1650        let write_batch_size = chunk.estimated_heap_size();
1651        let batch = match &inner.writer {
1652            IcebergWriterDispatch::Append { .. } => {
1653                // separate out insert chunk
1654                let filters =
1655                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1656                chunk.set_visibility(filters);
1657                IcebergArrowConvert
1658                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1659                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1660            }
1661            IcebergWriterDispatch::Upsert {
1662                arrow_schema_with_op_column,
1663                ..
1664            } => {
1665                let chunk = IcebergArrowConvert
1666                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
1667                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1668                let ops = Arc::new(Int32Array::from(
1669                    ops.iter()
1670                        .map(|op| match op {
1671                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1672                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1673                        })
1674                        .collect_vec(),
1675                ));
1676                let mut columns = chunk.columns().to_vec();
1677                columns.push(ops);
1678                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1679                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1680            }
1681        };
1682
1683        let writer = inner.writer.get_writer().unwrap();
1684        writer
1685            .write(batch)
1686            .instrument_await("iceberg_write")
1687            .await
1688            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1689        inner.metrics.write_bytes.inc_by(write_batch_size as _);
1690        Ok(())
1691    }
1692
1693    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1694    /// writer should commit the current epoch.
1695    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1696        let Self::Initialized(inner) = self else {
1697            unreachable!("IcebergSinkWriter should be initialized before barrier");
1698        };
1699
1700        // Skip it if not checkpoint
1701        if !is_checkpoint {
1702            return Ok(None);
1703        }
1704
1705        let close_result = match &mut inner.writer {
1706            IcebergWriterDispatch::Append {
1707                writer,
1708                writer_builder,
1709            } => {
1710                let close_result = match writer.take() {
1711                    Some(mut writer) => {
1712                        Some(writer.close().instrument_await("iceberg_close").await)
1713                    }
1714                    _ => None,
1715                };
1716                match writer_builder.clone().build() {
1717                    Ok(new_writer) => {
1718                        *writer = Some(Box::new(new_writer));
1719                    }
1720                    _ => {
1721                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1722                        // here because current writer may close successfully. So we just log the error.
1723                        warn!("Failed to build new writer after close");
1724                    }
1725                }
1726                close_result
1727            }
1728            IcebergWriterDispatch::Upsert {
1729                writer,
1730                writer_builder,
1731                ..
1732            } => {
1733                let close_result = match writer.take() {
1734                    Some(mut writer) => {
1735                        Some(writer.close().instrument_await("iceberg_close").await)
1736                    }
1737                    _ => None,
1738                };
1739                match writer_builder.clone().build() {
1740                    Ok(new_writer) => {
1741                        *writer = Some(Box::new(new_writer));
1742                    }
1743                    _ => {
1744                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1745                        // here because current writer may close successfully. So we just log the error.
1746                        warn!("Failed to build new writer after close");
1747                    }
1748                }
1749                close_result
1750            }
1751        };
1752
1753        match close_result {
1754            Some(Ok(result)) => {
1755                let format_version = inner.table.metadata().format_version();
1756                let partition_type = inner.table.metadata().default_partition_type();
1757                let data_files = result
1758                    .into_iter()
1759                    .map(|f| {
1760                        // Truncate large column statistics BEFORE serialization
1761                        let truncated = truncate_datafile(f);
1762                        SerializedDataFile::try_from(truncated, partition_type, format_version)
1763                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1764                    })
1765                    .collect::<Result<Vec<_>>>()?;
1766                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1767                    data_files,
1768                    schema_id: inner.table.metadata().current_schema_id(),
1769                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1770                })?))
1771            }
1772            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1773            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1774        }
1775    }
1776}
1777
1778const SCHEMA_ID: &str = "schema_id";
1779const PARTITION_SPEC_ID: &str = "partition_spec_id";
1780const DATA_FILES: &str = "data_files";
1781
1782/// Maximum size for column statistics (min/max values) in bytes.
1783/// Column statistics larger than this will be truncated to avoid metadata bloat.
1784/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
1785///
1786/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
1787/// We truncate at the `DataFile` level (before serialization) by directly modifying
1788/// the public `lower_bounds` and `upper_bounds` fields.
1789///
1790/// This prevents metadata from ballooning to gigabytes when dealing with large
1791/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
1792/// that benefit from query optimization.
1793const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
1794
1795/// Truncate large column statistics from `DataFile` BEFORE serialization.
1796///
1797/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
1798/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
1799///
1800/// # Arguments
1801/// * `data_file` - A `DataFile` to process
1802///
1803/// # Returns
1804/// The modified `DataFile` with large statistics truncated
1805fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1806    // Process lower_bounds - remove entries with large values
1807    data_file.lower_bounds.retain(|field_id, datum| {
1808        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1809        let size = match datum.to_bytes() {
1810            Ok(bytes) => bytes.len(),
1811            Err(_) => 0,
1812        };
1813
1814        if size > MAX_COLUMN_STAT_SIZE {
1815            tracing::debug!(
1816                field_id = field_id,
1817                size = size,
1818                "Truncating large lower_bound statistic"
1819            );
1820            return false;
1821        }
1822        true
1823    });
1824
1825    // Process upper_bounds - remove entries with large values
1826    data_file.upper_bounds.retain(|field_id, datum| {
1827        // Use to_bytes() to get the actual binary size without JSON serialization overhead
1828        let size = match datum.to_bytes() {
1829            Ok(bytes) => bytes.len(),
1830            Err(_) => 0,
1831        };
1832
1833        if size > MAX_COLUMN_STAT_SIZE {
1834            tracing::debug!(
1835                field_id = field_id,
1836                size = size,
1837                "Truncating large upper_bound statistic"
1838            );
1839            return false;
1840        }
1841        true
1842    });
1843
1844    data_file
1845}
1846
1847#[derive(Default, Clone)]
1848struct IcebergCommitResult {
1849    schema_id: i32,
1850    partition_spec_id: i32,
1851    data_files: Vec<SerializedDataFile>,
1852}
1853
1854impl IcebergCommitResult {
1855    fn try_from(value: &SinkMetadata) -> Result<Self> {
1856        if let Some(Serialized(v)) = &value.metadata {
1857            let mut values = if let serde_json::Value::Object(v) =
1858                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1859                    .context("Can't parse iceberg sink metadata")?
1860            {
1861                v
1862            } else {
1863                bail!("iceberg sink metadata should be an object");
1864            };
1865
1866            let schema_id;
1867            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1868                schema_id = value
1869                    .as_u64()
1870                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1871            } else {
1872                bail!("iceberg sink metadata should have schema_id");
1873            }
1874
1875            let partition_spec_id;
1876            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1877                partition_spec_id = value
1878                    .as_u64()
1879                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1880            } else {
1881                bail!("iceberg sink metadata should have partition_spec_id");
1882            }
1883
1884            let data_files: Vec<SerializedDataFile>;
1885            if let serde_json::Value::Array(values) = values
1886                .remove(DATA_FILES)
1887                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1888            {
1889                data_files = values
1890                    .into_iter()
1891                    .map(from_value::<SerializedDataFile>)
1892                    .collect::<std::result::Result<_, _>>()
1893                    .unwrap();
1894            } else {
1895                bail!("iceberg sink metadata should have data_files object");
1896            }
1897
1898            Ok(Self {
1899                schema_id: schema_id as i32,
1900                partition_spec_id: partition_spec_id as i32,
1901                data_files,
1902            })
1903        } else {
1904            bail!("Can't create iceberg sink write result from empty data!")
1905        }
1906    }
1907
1908    fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1909        let mut values = if let serde_json::Value::Object(value) =
1910            serde_json::from_slice::<serde_json::Value>(&value)
1911                .context("Can't parse iceberg sink metadata")?
1912        {
1913            value
1914        } else {
1915            bail!("iceberg sink metadata should be an object");
1916        };
1917
1918        let schema_id;
1919        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1920            schema_id = value
1921                .as_u64()
1922                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1923        } else {
1924            bail!("iceberg sink metadata should have schema_id");
1925        }
1926
1927        let partition_spec_id;
1928        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1929            partition_spec_id = value
1930                .as_u64()
1931                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1932        } else {
1933            bail!("iceberg sink metadata should have partition_spec_id");
1934        }
1935
1936        let data_files: Vec<SerializedDataFile>;
1937        if let serde_json::Value::Array(values) = values
1938            .remove(DATA_FILES)
1939            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1940        {
1941            data_files = values
1942                .into_iter()
1943                .map(from_value::<SerializedDataFile>)
1944                .collect::<std::result::Result<_, _>>()
1945                .unwrap();
1946        } else {
1947            bail!("iceberg sink metadata should have data_files object");
1948        }
1949
1950        Ok(Self {
1951            schema_id: schema_id as i32,
1952            partition_spec_id: partition_spec_id as i32,
1953            data_files,
1954        })
1955    }
1956}
1957
1958impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1959    type Error = SinkError;
1960
1961    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1962        let json_data_files = serde_json::Value::Array(
1963            value
1964                .data_files
1965                .iter()
1966                .map(serde_json::to_value)
1967                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1968                .context("Can't serialize data files to json")?,
1969        );
1970        let json_value = serde_json::Value::Object(
1971            vec![
1972                (
1973                    SCHEMA_ID.to_owned(),
1974                    serde_json::Value::Number(value.schema_id.into()),
1975                ),
1976                (
1977                    PARTITION_SPEC_ID.to_owned(),
1978                    serde_json::Value::Number(value.partition_spec_id.into()),
1979                ),
1980                (DATA_FILES.to_owned(), json_data_files),
1981            ]
1982            .into_iter()
1983            .collect(),
1984        );
1985        Ok(SinkMetadata {
1986            metadata: Some(Serialized(SerializedMetadata {
1987                metadata: serde_json::to_vec(&json_value)
1988                    .context("Can't serialize iceberg sink metadata")?,
1989            })),
1990        })
1991    }
1992}
1993
1994impl TryFrom<IcebergCommitResult> for Vec<u8> {
1995    type Error = SinkError;
1996
1997    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1998        let json_data_files = serde_json::Value::Array(
1999            value
2000                .data_files
2001                .iter()
2002                .map(serde_json::to_value)
2003                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2004                .context("Can't serialize data files to json")?,
2005        );
2006        let json_value = serde_json::Value::Object(
2007            vec![
2008                (
2009                    SCHEMA_ID.to_owned(),
2010                    serde_json::Value::Number(value.schema_id.into()),
2011                ),
2012                (
2013                    PARTITION_SPEC_ID.to_owned(),
2014                    serde_json::Value::Number(value.partition_spec_id.into()),
2015                ),
2016                (DATA_FILES.to_owned(), json_data_files),
2017            ]
2018            .into_iter()
2019            .collect(),
2020        );
2021        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2022    }
2023}
2024pub struct IcebergSinkCommitter {
2025    catalog: Arc<dyn Catalog>,
2026    table: Table,
2027    pub last_commit_epoch: u64,
2028    pub(crate) sink_id: SinkId,
2029    pub(crate) config: IcebergConfig,
2030    pub(crate) param: SinkParam,
2031    commit_retry_num: u32,
2032    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2033}
2034
2035impl IcebergSinkCommitter {
2036    // Reload table and guarantee current schema_id and partition_spec_id matches
2037    // given `schema_id` and `partition_spec_id`
2038    async fn reload_table(
2039        catalog: &dyn Catalog,
2040        table_ident: &TableIdent,
2041        schema_id: i32,
2042        partition_spec_id: i32,
2043    ) -> Result<Table> {
2044        let table = catalog
2045            .load_table(table_ident)
2046            .await
2047            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2048        if table.metadata().current_schema_id() != schema_id {
2049            return Err(SinkError::Iceberg(anyhow!(
2050                "Schema evolution not supported, expect schema id {}, but got {}",
2051                schema_id,
2052                table.metadata().current_schema_id()
2053            )));
2054        }
2055        if table.metadata().default_partition_spec_id() != partition_spec_id {
2056            return Err(SinkError::Iceberg(anyhow!(
2057                "Partition evolution not supported, expect partition spec id {}, but got {}",
2058                partition_spec_id,
2059                table.metadata().default_partition_spec_id()
2060            )));
2061        }
2062        Ok(table)
2063    }
2064}
2065
2066#[async_trait]
2067impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2068    async fn init(&mut self) -> Result<()> {
2069        tracing::info!(
2070            sink_id = %self.param.sink_id,
2071            "Iceberg sink coordinator initialized",
2072        );
2073
2074        Ok(())
2075    }
2076
2077    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2078        tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2079
2080        if metadata.is_empty() {
2081            tracing::debug!(?epoch, "No datafile to commit");
2082            return Ok(());
2083        }
2084
2085        // Commit data if present
2086        if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2087            self.commit_data_impl(epoch, write_results, snapshot_id)
2088                .await?;
2089        }
2090
2091        Ok(())
2092    }
2093
2094    async fn commit_schema_change(
2095        &mut self,
2096        epoch: u64,
2097        schema_change: PbSinkSchemaChange,
2098    ) -> Result<()> {
2099        tracing::info!(
2100            "Committing schema change {:?} in epoch {}",
2101            schema_change,
2102            epoch
2103        );
2104        self.commit_schema_change_impl(schema_change).await?;
2105        tracing::info!("Successfully committed schema change in epoch {}", epoch);
2106
2107        Ok(())
2108    }
2109}
2110
2111#[async_trait]
2112impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2113    async fn init(&mut self) -> Result<()> {
2114        tracing::info!(
2115            sink_id = %self.param.sink_id,
2116            "Iceberg sink coordinator initialized",
2117        );
2118
2119        Ok(())
2120    }
2121
2122    async fn pre_commit(
2123        &mut self,
2124        epoch: u64,
2125        metadata: Vec<SinkMetadata>,
2126        _schema_change: Option<PbSinkSchemaChange>,
2127    ) -> Result<Option<Vec<u8>>> {
2128        tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2129
2130        let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2131            Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2132            None => {
2133                tracing::debug!(?epoch, "no data to pre commit");
2134                return Ok(None);
2135            }
2136        };
2137
2138        let mut write_results_bytes = Vec::new();
2139        for each_parallelism_write_result in write_results {
2140            let each_parallelism_write_result_bytes: Vec<u8> =
2141                each_parallelism_write_result.try_into()?;
2142            write_results_bytes.push(each_parallelism_write_result_bytes);
2143        }
2144
2145        let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2146        write_results_bytes.push(snapshot_id_bytes);
2147
2148        let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2149        Ok(Some(pre_commit_metadata_bytes))
2150    }
2151
2152    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2153        tracing::debug!("Starting iceberg commit in epoch {epoch}");
2154
2155        if commit_metadata.is_empty() {
2156            tracing::debug!(?epoch, "No datafile to commit");
2157            return Ok(());
2158        }
2159
2160        // Deserialize commit metadata
2161        let mut payload = deserialize_metadata(commit_metadata);
2162        if payload.is_empty() {
2163            return Err(SinkError::Iceberg(anyhow!(
2164                "Invalid commit metadata: empty payload"
2165            )));
2166        }
2167
2168        // Last element is snapshot_id
2169        let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2170            SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2171        })?;
2172        let snapshot_id = i64::from_le_bytes(
2173            snapshot_id_bytes
2174                .try_into()
2175                .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2176        );
2177
2178        // Remaining elements are write_results
2179        let write_results = payload
2180            .into_iter()
2181            .map(IcebergCommitResult::try_from_serialized_bytes)
2182            .collect::<Result<Vec<_>>>()?;
2183
2184        let snapshot_committed = self
2185            .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2186            .await?;
2187
2188        if snapshot_committed {
2189            tracing::info!(
2190                "Snapshot id {} already committed in iceberg table, skip committing again.",
2191                snapshot_id
2192            );
2193            return Ok(());
2194        }
2195
2196        self.commit_data_impl(epoch, write_results, snapshot_id)
2197            .await
2198    }
2199
2200    async fn commit_schema_change(
2201        &mut self,
2202        epoch: u64,
2203        schema_change: PbSinkSchemaChange,
2204    ) -> Result<()> {
2205        let schema_updated = self.check_schema_change_applied(&schema_change)?;
2206        if schema_updated {
2207            tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2208            return Ok(());
2209        }
2210
2211        tracing::info!(
2212            "Committing schema change {:?} in epoch {}",
2213            schema_change,
2214            epoch
2215        );
2216        self.commit_schema_change_impl(schema_change).await?;
2217        tracing::info!("Successfully committed schema change in epoch {epoch}");
2218
2219        Ok(())
2220    }
2221
2222    async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2223        // TODO: Files that have been written but not committed should be deleted.
2224        tracing::debug!("Abort not implemented yet");
2225    }
2226}
2227
2228/// Methods Required to Achieve Exactly Once Semantics
2229impl IcebergSinkCommitter {
2230    fn pre_commit_inner(
2231        &mut self,
2232        _epoch: u64,
2233        metadata: Vec<SinkMetadata>,
2234    ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2235        let write_results: Vec<IcebergCommitResult> = metadata
2236            .iter()
2237            .map(IcebergCommitResult::try_from)
2238            .collect::<Result<Vec<IcebergCommitResult>>>()?;
2239
2240        // Skip if no data to commit
2241        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2242            return Ok(None);
2243        }
2244
2245        let expect_schema_id = write_results[0].schema_id;
2246        let expect_partition_spec_id = write_results[0].partition_spec_id;
2247
2248        // guarantee that all write results has same schema_id and partition_spec_id
2249        if write_results
2250            .iter()
2251            .any(|r| r.schema_id != expect_schema_id)
2252            || write_results
2253                .iter()
2254                .any(|r| r.partition_spec_id != expect_partition_spec_id)
2255        {
2256            return Err(SinkError::Iceberg(anyhow!(
2257                "schema_id and partition_spec_id should be the same in all write results"
2258            )));
2259        }
2260
2261        let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2262
2263        Ok(Some((write_results, snapshot_id)))
2264    }
2265
2266    async fn commit_data_impl(
2267        &mut self,
2268        epoch: u64,
2269        write_results: Vec<IcebergCommitResult>,
2270        snapshot_id: i64,
2271    ) -> Result<()> {
2272        // Empty write results should be handled before calling this function.
2273        assert!(
2274            !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2275        );
2276
2277        // Check snapshot limit before proceeding with commit
2278        self.wait_for_snapshot_limit().await?;
2279
2280        let expect_schema_id = write_results[0].schema_id;
2281        let expect_partition_spec_id = write_results[0].partition_spec_id;
2282
2283        // Load the latest table to avoid concurrent modification with the best effort.
2284        self.table = Self::reload_table(
2285            self.catalog.as_ref(),
2286            self.table.identifier(),
2287            expect_schema_id,
2288            expect_partition_spec_id,
2289        )
2290        .await?;
2291
2292        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2293            return Err(SinkError::Iceberg(anyhow!(
2294                "Can't find schema by id {}",
2295                expect_schema_id
2296            )));
2297        };
2298        let Some(partition_spec) = self
2299            .table
2300            .metadata()
2301            .partition_spec_by_id(expect_partition_spec_id)
2302        else {
2303            return Err(SinkError::Iceberg(anyhow!(
2304                "Can't find partition spec by id {}",
2305                expect_partition_spec_id
2306            )));
2307        };
2308        let partition_type = partition_spec
2309            .as_ref()
2310            .clone()
2311            .partition_type(schema)
2312            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2313
2314        let data_files = write_results
2315            .into_iter()
2316            .flat_map(|r| {
2317                r.data_files.into_iter().map(|f| {
2318                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2319                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2320                })
2321            })
2322            .collect::<Result<Vec<DataFile>>>()?;
2323        // # TODO:
2324        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2325        // because retry logic involved reapply the commit metadata.
2326        // For now, we just retry the commit operation.
2327        let retry_strategy = ExponentialBackoff::from_millis(10)
2328            .max_delay(Duration::from_secs(60))
2329            .map(jitter)
2330            .take(self.commit_retry_num as usize);
2331        let catalog = self.catalog.clone();
2332        let table_ident = self.table.identifier().clone();
2333
2334        // Custom retry logic that:
2335        // 1. Calls reload_table before each commit attempt to get the latest metadata
2336        // 2. If reload_table fails (table not exists/schema/partition mismatch), stops retrying immediately
2337        // 3. If commit fails, retries with backoff
2338        enum CommitError {
2339            ReloadTable(SinkError), // Non-retriable: schema/partition mismatch
2340            Commit(SinkError),      // Retriable: commit conflicts, network errors
2341        }
2342
2343        let table = RetryIf::spawn(
2344            retry_strategy,
2345            || async {
2346                // Reload table before each commit attempt to get the latest metadata
2347                let table = Self::reload_table(
2348                    catalog.as_ref(),
2349                    &table_ident,
2350                    expect_schema_id,
2351                    expect_partition_spec_id,
2352                )
2353                .await
2354                .map_err(|e| {
2355                    tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2356                    CommitError::ReloadTable(e)
2357                })?;
2358
2359                let txn = Transaction::new(&table);
2360                let append_action = txn
2361                    .fast_append()
2362                    .set_snapshot_id(snapshot_id)
2363                    .set_target_branch(commit_branch(
2364                        self.config.r#type.as_str(),
2365                        self.config.write_mode,
2366                    ))
2367                    .add_data_files(data_files.clone());
2368
2369                let tx = append_action.apply(txn).map_err(|err| {
2370                    let err: IcebergError = err.into();
2371                    tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2372                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2373                })?;
2374
2375                tx.commit(catalog.as_ref()).await.map_err(|err| {
2376                    let err: IcebergError = err.into();
2377                    tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2378                    CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2379                })
2380            },
2381            |err: &CommitError| {
2382                // Only retry on commit errors, not on reload_table errors
2383                match err {
2384                    CommitError::Commit(_) => {
2385                        tracing::warn!("Commit failed, will retry");
2386                        true
2387                    }
2388                    CommitError::ReloadTable(_) => {
2389                        tracing::error!(
2390                            "reload_table failed with non-retriable error, will not retry"
2391                        );
2392                        false
2393                    }
2394                }
2395            },
2396        )
2397        .await
2398        .map_err(|e| match e {
2399            CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2400        })?;
2401        self.table = table;
2402
2403        let snapshot_num = self.table.metadata().snapshots().count();
2404        let catalog_name = self.config.common.catalog_name();
2405        let table_name = self.table.identifier().to_string();
2406        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2407        GLOBAL_SINK_METRICS
2408            .iceberg_snapshot_num
2409            .with_guarded_label_values(&metrics_labels)
2410            .set(snapshot_num as i64);
2411
2412        tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2413
2414        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2415            && self.config.enable_compaction
2416            && iceberg_compact_stat_sender
2417                .send(IcebergSinkCompactionUpdate {
2418                    sink_id: self.sink_id,
2419                    compaction_interval: self.config.compaction_interval_sec(),
2420                    force_compaction: false,
2421                })
2422                .is_err()
2423        {
2424            warn!("failed to send iceberg compaction stats");
2425        }
2426
2427        Ok(())
2428    }
2429
2430    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2431    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2432    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2433    async fn is_snapshot_id_in_iceberg(
2434        &self,
2435        iceberg_config: &IcebergConfig,
2436        snapshot_id: i64,
2437    ) -> Result<bool> {
2438        let table = iceberg_config.load_table().await?;
2439        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2440            Ok(true)
2441        } else {
2442            Ok(false)
2443        }
2444    }
2445
2446    /// Check if the specified columns already exist in the iceberg table's current schema.
2447    /// This is used to determine if schema change has already been applied.
2448    fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2449        let current_schema = self.table.metadata().current_schema();
2450        let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2451            .context("Failed to convert schema")
2452            .map_err(SinkError::Iceberg)?;
2453
2454        let iceberg_arrow_convert = IcebergArrowConvert;
2455
2456        let schema_matches = |expected: &[ArrowField]| {
2457            if current_arrow_schema.fields().len() != expected.len() {
2458                return false;
2459            }
2460
2461            expected.iter().all(|expected_field| {
2462                current_arrow_schema.fields().iter().any(|current_field| {
2463                    current_field.name() == expected_field.name()
2464                        && current_field.data_type() == expected_field.data_type()
2465                })
2466            })
2467        };
2468
2469        let original_arrow_fields: Vec<ArrowField> = schema_change
2470            .original_schema
2471            .iter()
2472            .map(|pb_field| {
2473                let field = Field::from(pb_field);
2474                iceberg_arrow_convert
2475                    .to_arrow_field(&field.name, &field.data_type)
2476                    .context("Failed to convert field to arrow")
2477                    .map_err(SinkError::Iceberg)
2478            })
2479            .collect::<Result<_>>()?;
2480
2481        // If current schema equals original_schema, then schema change is NOT applied.
2482        if schema_matches(&original_arrow_fields) {
2483            tracing::debug!(
2484                "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2485                original_arrow_fields.len()
2486            );
2487            return Ok(false);
2488        }
2489
2490        // We only support add_columns for now.
2491        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2492            schema_change.op.as_ref()
2493        else {
2494            return Err(SinkError::Iceberg(anyhow!(
2495                "Unsupported sink schema change op in iceberg sink: {:?}",
2496                schema_change.op
2497            )));
2498        };
2499
2500        let add_arrow_fields: Vec<ArrowField> = add_columns_op
2501            .fields
2502            .iter()
2503            .map(|pb_field| {
2504                let field = Field::from(pb_field);
2505                iceberg_arrow_convert
2506                    .to_arrow_field(&field.name, &field.data_type)
2507                    .context("Failed to convert field to arrow")
2508                    .map_err(SinkError::Iceberg)
2509            })
2510            .collect::<Result<_>>()?;
2511
2512        let mut expected_after_change = original_arrow_fields;
2513        expected_after_change.extend(add_arrow_fields);
2514
2515        // If current schema equals original_schema + add_columns, then schema change is applied.
2516        if schema_matches(&expected_after_change) {
2517            tracing::debug!(
2518                "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2519                expected_after_change.len()
2520            );
2521            return Ok(true);
2522        }
2523
2524        Err(SinkError::Iceberg(anyhow!(
2525            "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2526            schema_change.original_schema.len()
2527        )))
2528    }
2529
2530    /// Commit schema changes (e.g., add columns) to the iceberg table.
2531    /// This function uses Transaction API to atomically update the table schema
2532    /// with optimistic locking to prevent concurrent conflicts.
2533    async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2534        use iceberg::spec::NestedField;
2535
2536        let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2537            schema_change.op.as_ref()
2538        else {
2539            return Err(SinkError::Iceberg(anyhow!(
2540                "Unsupported sink schema change op in iceberg sink: {:?}",
2541                schema_change.op
2542            )));
2543        };
2544
2545        let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2546
2547        // Step 1: Get current table metadata
2548        let metadata = self.table.metadata();
2549        let mut next_field_id = metadata.last_column_id() + 1;
2550        tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2551
2552        // Step 2: Build new fields to add
2553        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2554        let mut new_fields = Vec::new();
2555
2556        for field in &add_columns {
2557            // Convert RisingWave Field to Arrow Field using IcebergCreateTableArrowConvert
2558            let arrow_field = iceberg_create_table_arrow_convert
2559                .to_arrow_field(&field.name, &field.data_type)
2560                .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2561                .map_err(SinkError::Iceberg)?;
2562
2563            // Convert Arrow DataType to Iceberg Type
2564            let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2565                .map_err(|err| {
2566                    SinkError::Iceberg(
2567                        anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2568                    )
2569                })?;
2570
2571            // Create NestedField with the next available field ID
2572            let nested_field = Arc::new(NestedField::optional(
2573                next_field_id,
2574                &field.name,
2575                iceberg_type,
2576            ));
2577
2578            new_fields.push(nested_field);
2579            tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2580            next_field_id += 1;
2581        }
2582
2583        // Step 3: Create Transaction with UpdateSchemaAction
2584        tracing::info!(
2585            "Committing schema change to catalog for table {}",
2586            self.table.identifier()
2587        );
2588
2589        let txn = Transaction::new(&self.table);
2590        let action = txn.update_schema().add_fields(new_fields);
2591
2592        let updated_table = action
2593            .apply(txn)
2594            .context("Failed to apply schema update action")
2595            .map_err(SinkError::Iceberg)?
2596            .commit(self.catalog.as_ref())
2597            .await
2598            .context("Failed to commit table schema change")
2599            .map_err(SinkError::Iceberg)?;
2600
2601        self.table = updated_table;
2602
2603        tracing::info!(
2604            "Successfully committed schema change, added {} columns to iceberg table",
2605            add_columns.len()
2606        );
2607
2608        Ok(())
2609    }
2610
2611    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2612    /// Returns the number of snapshots since the last rewrite/overwrite
2613    fn count_snapshots_since_rewrite(&self) -> usize {
2614        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2615        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2616
2617        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2618        let mut count = 0;
2619        for snapshot in snapshots {
2620            // Check if this snapshot represents a rewrite or overwrite operation
2621            let summary = snapshot.summary();
2622            match &summary.operation {
2623                Operation::Replace => {
2624                    // Found a rewrite/overwrite operation, stop counting
2625                    break;
2626                }
2627
2628                _ => {
2629                    // Increment count for each snapshot that is not a rewrite/overwrite
2630                    count += 1;
2631                }
2632            }
2633        }
2634
2635        count
2636    }
2637
2638    /// Wait until snapshot count since last rewrite is below the limit
2639    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2640        if !self.config.enable_compaction {
2641            return Ok(());
2642        }
2643
2644        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2645            loop {
2646                let current_count = self.count_snapshots_since_rewrite();
2647
2648                if current_count < max_snapshots {
2649                    tracing::info!(
2650                        "Snapshot count check passed: {} < {}",
2651                        current_count,
2652                        max_snapshots
2653                    );
2654                    break;
2655                }
2656
2657                tracing::info!(
2658                    "Snapshot count {} exceeds limit {}, waiting...",
2659                    current_count,
2660                    max_snapshots
2661                );
2662
2663                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2664                    && iceberg_compact_stat_sender
2665                        .send(IcebergSinkCompactionUpdate {
2666                            sink_id: self.sink_id,
2667                            compaction_interval: self.config.compaction_interval_sec(),
2668                            force_compaction: true,
2669                        })
2670                        .is_err()
2671                {
2672                    tracing::warn!("failed to send iceberg compaction stats");
2673                }
2674
2675                // Wait for 30 seconds before checking again
2676                tokio::time::sleep(Duration::from_secs(30)).await;
2677
2678                // Reload table to get latest snapshots
2679                self.table = self.config.load_table().await?;
2680            }
2681        }
2682        Ok(())
2683    }
2684}
2685
2686const MAP_KEY: &str = "key";
2687const MAP_VALUE: &str = "value";
2688
2689fn get_fields<'a>(
2690    our_field_type: &'a risingwave_common::types::DataType,
2691    data_type: &ArrowDataType,
2692    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2693) -> Option<ArrowFields> {
2694    match data_type {
2695        ArrowDataType::Struct(fields) => {
2696            match our_field_type {
2697                risingwave_common::types::DataType::Struct(struct_fields) => {
2698                    struct_fields.iter().for_each(|(name, data_type)| {
2699                        let res = schema_fields.insert(name, data_type);
2700                        // This assert is to make sure there is no duplicate field name in the schema.
2701                        assert!(res.is_none())
2702                    });
2703                }
2704                risingwave_common::types::DataType::Map(map_fields) => {
2705                    schema_fields.insert(MAP_KEY, map_fields.key());
2706                    schema_fields.insert(MAP_VALUE, map_fields.value());
2707                }
2708                risingwave_common::types::DataType::List(list) => {
2709                    list.elem()
2710                        .as_struct()
2711                        .iter()
2712                        .for_each(|(name, data_type)| {
2713                            let res = schema_fields.insert(name, data_type);
2714                            // This assert is to make sure there is no duplicate field name in the schema.
2715                            assert!(res.is_none())
2716                        });
2717                }
2718                _ => {}
2719            };
2720            Some(fields.clone())
2721        }
2722        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2723            get_fields(our_field_type, field.data_type(), schema_fields)
2724        }
2725        _ => None, // not a supported complex type and unlikely to show up
2726    }
2727}
2728
2729fn check_compatibility(
2730    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2731    fields: &ArrowFields,
2732) -> anyhow::Result<bool> {
2733    for arrow_field in fields {
2734        let our_field_type = schema_fields
2735            .get(arrow_field.name().as_str())
2736            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2737
2738        // Iceberg source should be able to read iceberg decimal type.
2739        let converted_arrow_data_type = IcebergArrowConvert
2740            .to_arrow_field("", our_field_type)
2741            .map_err(|e| anyhow!(e))?
2742            .data_type()
2743            .clone();
2744
2745        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2746            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2747            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2748            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2749            (ArrowDataType::List(_), ArrowDataType::List(field))
2750            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2751                let mut schema_fields = HashMap::new();
2752                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2753                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2754            }
2755            // validate nested structs
2756            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2757                let mut schema_fields = HashMap::new();
2758                our_field_type
2759                    .as_struct()
2760                    .iter()
2761                    .for_each(|(name, data_type)| {
2762                        let res = schema_fields.insert(name, data_type);
2763                        // This assert is to make sure there is no duplicate field name in the schema.
2764                        assert!(res.is_none())
2765                    });
2766                check_compatibility(schema_fields, fields)?
2767            }
2768            // cases where left != right (metadata, field name mismatch)
2769            //
2770            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2771            // {"PARQUET:field_id": ".."}
2772            //
2773            // map: The standard name in arrow is "entries", "key", "value".
2774            // in iceberg-rs, it's called "key_value"
2775            (left, right) => left.equals_datatype(right),
2776        };
2777        if !compatible {
2778            bail!(
2779                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2780                arrow_field.name(),
2781                converted_arrow_data_type,
2782                arrow_field.data_type()
2783            );
2784        }
2785    }
2786    Ok(true)
2787}
2788
2789/// Try to match our schema with iceberg schema.
2790pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2791    if rw_schema.fields.len() != arrow_schema.fields().len() {
2792        bail!(
2793            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2794            rw_schema.fields.len(),
2795            arrow_schema.fields.len()
2796        );
2797    }
2798
2799    let mut schema_fields = HashMap::new();
2800    rw_schema.fields.iter().for_each(|field| {
2801        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2802        // This assert is to make sure there is no duplicate field name in the schema.
2803        assert!(res.is_none())
2804    });
2805
2806    check_compatibility(schema_fields, &arrow_schema.fields)?;
2807    Ok(())
2808}
2809
2810fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2811    serde_json::to_vec(&metadata).unwrap()
2812}
2813
2814fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2815    serde_json::from_slice(&bytes).unwrap()
2816}
2817
2818pub fn parse_partition_by_exprs(
2819    expr: String,
2820) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2821    // captures column, transform(column), transform(n,column), transform(n, column)
2822    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2823    if !re.is_match(&expr) {
2824        bail!(format!(
2825            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2826            expr
2827        ))
2828    }
2829    let caps = re.captures_iter(&expr);
2830
2831    let mut partition_columns = vec![];
2832
2833    for mat in caps {
2834        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2835            (&mat["transform"], Transform::Identity)
2836        } else {
2837            let mut func = mat["transform"].to_owned();
2838            if func == "bucket" || func == "truncate" {
2839                let n = &mat
2840                    .name("n")
2841                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2842                    .as_str();
2843                func = format!("{func}[{n}]");
2844            }
2845            (
2846                &mat["field"],
2847                Transform::from_str(&func)
2848                    .with_context(|| format!("invalid transform function {}", func))?,
2849            )
2850        };
2851        partition_columns.push((column.to_owned(), transform));
2852    }
2853    Ok(partition_columns)
2854}
2855
2856pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2857    if should_enable_iceberg_cow(sink_type, write_mode) {
2858        ICEBERG_COW_BRANCH.to_owned()
2859    } else {
2860        MAIN_BRANCH.to_owned()
2861    }
2862}
2863
2864pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2865    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2866}
2867
2868impl crate::with_options::WithOptions for IcebergWriteMode {}
2869
2870impl crate::with_options::WithOptions for FormatVersion {}
2871
2872impl crate::with_options::WithOptions for CompactionType {}
2873
2874#[cfg(test)]
2875mod test {
2876    use std::collections::BTreeMap;
2877
2878    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2879    use risingwave_common::types::{DataType, MapType, StructType};
2880
2881    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2882    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2883    use crate::sink::iceberg::{
2884        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2885        ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2886        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2887        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2888    };
2889
2890    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2891
2892    #[test]
2893    fn test_compatible_arrow_schema() {
2894        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2895
2896        use super::*;
2897        let risingwave_schema = Schema::new(vec![
2898            Field::with_name(DataType::Int32, "a"),
2899            Field::with_name(DataType::Int32, "b"),
2900            Field::with_name(DataType::Int32, "c"),
2901        ]);
2902        let arrow_schema = ArrowSchema::new(vec![
2903            ArrowField::new("a", ArrowDataType::Int32, false),
2904            ArrowField::new("b", ArrowDataType::Int32, false),
2905            ArrowField::new("c", ArrowDataType::Int32, false),
2906        ]);
2907
2908        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2909
2910        let risingwave_schema = Schema::new(vec![
2911            Field::with_name(DataType::Int32, "d"),
2912            Field::with_name(DataType::Int32, "c"),
2913            Field::with_name(DataType::Int32, "a"),
2914            Field::with_name(DataType::Int32, "b"),
2915        ]);
2916        let arrow_schema = ArrowSchema::new(vec![
2917            ArrowField::new("a", ArrowDataType::Int32, false),
2918            ArrowField::new("b", ArrowDataType::Int32, false),
2919            ArrowField::new("d", ArrowDataType::Int32, false),
2920            ArrowField::new("c", ArrowDataType::Int32, false),
2921        ]);
2922        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2923
2924        let risingwave_schema = Schema::new(vec![
2925            Field::with_name(
2926                DataType::Struct(StructType::new(vec![
2927                    ("a1", DataType::Int32),
2928                    (
2929                        "a2",
2930                        DataType::Struct(StructType::new(vec![
2931                            ("a21", DataType::Bytea),
2932                            (
2933                                "a22",
2934                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2935                            ),
2936                        ])),
2937                    ),
2938                ])),
2939                "a",
2940            ),
2941            Field::with_name(
2942                DataType::list(DataType::Struct(StructType::new(vec![
2943                    ("b1", DataType::Int32),
2944                    ("b2", DataType::Bytea),
2945                    (
2946                        "b3",
2947                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2948                    ),
2949                ]))),
2950                "b",
2951            ),
2952            Field::with_name(
2953                DataType::Map(MapType::from_kv(
2954                    DataType::Varchar,
2955                    DataType::list(DataType::Struct(StructType::new([
2956                        ("c1", DataType::Int32),
2957                        ("c2", DataType::Bytea),
2958                        (
2959                            "c3",
2960                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2961                        ),
2962                    ]))),
2963                )),
2964                "c",
2965            ),
2966        ]);
2967        let arrow_schema = ArrowSchema::new(vec![
2968            ArrowField::new(
2969                "a",
2970                ArrowDataType::Struct(ArrowFields::from(vec![
2971                    ArrowField::new("a1", ArrowDataType::Int32, false),
2972                    ArrowField::new(
2973                        "a2",
2974                        ArrowDataType::Struct(ArrowFields::from(vec![
2975                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2976                            ArrowField::new_map(
2977                                "a22",
2978                                "entries",
2979                                ArrowFieldRef::new(ArrowField::new(
2980                                    "key",
2981                                    ArrowDataType::Utf8,
2982                                    false,
2983                                )),
2984                                ArrowFieldRef::new(ArrowField::new(
2985                                    "value",
2986                                    ArrowDataType::Utf8,
2987                                    false,
2988                                )),
2989                                false,
2990                                false,
2991                            ),
2992                        ])),
2993                        false,
2994                    ),
2995                ])),
2996                false,
2997            ),
2998            ArrowField::new(
2999                "b",
3000                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3001                    ArrowDataType::Struct(ArrowFields::from(vec![
3002                        ArrowField::new("b1", ArrowDataType::Int32, false),
3003                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3004                        ArrowField::new_map(
3005                            "b3",
3006                            "entries",
3007                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3008                            ArrowFieldRef::new(ArrowField::new(
3009                                "value",
3010                                ArrowDataType::Utf8,
3011                                false,
3012                            )),
3013                            false,
3014                            false,
3015                        ),
3016                    ])),
3017                    false,
3018                ))),
3019                false,
3020            ),
3021            ArrowField::new_map(
3022                "c",
3023                "entries",
3024                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3025                ArrowFieldRef::new(ArrowField::new(
3026                    "value",
3027                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3028                        ArrowDataType::Struct(ArrowFields::from(vec![
3029                            ArrowField::new("c1", ArrowDataType::Int32, false),
3030                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3031                            ArrowField::new_map(
3032                                "c3",
3033                                "entries",
3034                                ArrowFieldRef::new(ArrowField::new(
3035                                    "key",
3036                                    ArrowDataType::Utf8,
3037                                    false,
3038                                )),
3039                                ArrowFieldRef::new(ArrowField::new(
3040                                    "value",
3041                                    ArrowDataType::Utf8,
3042                                    false,
3043                                )),
3044                                false,
3045                                false,
3046                            ),
3047                        ])),
3048                        false,
3049                    ))),
3050                    false,
3051                )),
3052                false,
3053                false,
3054            ),
3055        ]);
3056        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3057    }
3058
3059    #[test]
3060    fn test_parse_iceberg_config() {
3061        let values = [
3062            ("connector", "iceberg"),
3063            ("type", "upsert"),
3064            ("primary_key", "v1"),
3065            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3066            ("warehouse.path", "s3://iceberg"),
3067            ("s3.endpoint", "http://127.0.0.1:9301"),
3068            ("s3.access.key", "hummockadmin"),
3069            ("s3.secret.key", "hummockadmin"),
3070            ("s3.path.style.access", "true"),
3071            ("s3.region", "us-east-1"),
3072            ("catalog.type", "jdbc"),
3073            ("catalog.name", "demo"),
3074            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3075            ("catalog.jdbc.user", "admin"),
3076            ("catalog.jdbc.password", "123456"),
3077            ("database.name", "demo_db"),
3078            ("table.name", "demo_table"),
3079            ("enable_compaction", "true"),
3080            ("compaction_interval_sec", "1800"),
3081            ("enable_snapshot_expiration", "true"),
3082        ]
3083        .into_iter()
3084        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3085        .collect();
3086
3087        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3088
3089        let expected_iceberg_config = IcebergConfig {
3090            common: IcebergCommon {
3091                warehouse_path: Some("s3://iceberg".to_owned()),
3092                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3093                s3_region: Some("us-east-1".to_owned()),
3094                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3095                s3_access_key: Some("hummockadmin".to_owned()),
3096                s3_secret_key: Some("hummockadmin".to_owned()),
3097                s3_iam_role_arn: None,
3098                gcs_credential: None,
3099                catalog_type: Some("jdbc".to_owned()),
3100                glue_id: None,
3101                glue_region: None,
3102                glue_access_key: None,
3103                glue_secret_key: None,
3104                glue_iam_role_arn: None,
3105                catalog_name: Some("demo".to_owned()),
3106                s3_path_style_access: Some(true),
3107                catalog_credential: None,
3108                catalog_oauth2_server_uri: None,
3109                catalog_scope: None,
3110                catalog_token: None,
3111                enable_config_load: None,
3112                rest_signing_name: None,
3113                rest_signing_region: None,
3114                rest_sigv4_enabled: None,
3115                hosted_catalog: None,
3116                azblob_account_name: None,
3117                azblob_account_key: None,
3118                azblob_endpoint_url: None,
3119                catalog_header: None,
3120                adlsgen2_account_name: None,
3121                adlsgen2_account_key: None,
3122                adlsgen2_endpoint: None,
3123                vended_credentials: None,
3124                catalog_security: None,
3125                gcp_auth_scopes: None,
3126                catalog_io_impl: None,
3127            },
3128            table: IcebergTableIdentifier {
3129                database_name: Some("demo_db".to_owned()),
3130                table_name: "demo_table".to_owned(),
3131            },
3132            r#type: "upsert".to_owned(),
3133            force_append_only: false,
3134            primary_key: Some(vec!["v1".to_owned()]),
3135            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3136            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3137                .into_iter()
3138                .map(|(k, v)| (k.to_owned(), v.to_owned()))
3139                .collect(),
3140            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3141            create_table_if_not_exists: false,
3142            is_exactly_once: Some(true),
3143            commit_retry_num: 8,
3144            enable_compaction: true,
3145            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3146            enable_snapshot_expiration: true,
3147            write_mode: IcebergWriteMode::MergeOnRead,
3148            format_version: FormatVersion::V2,
3149            snapshot_expiration_max_age_millis: None,
3150            snapshot_expiration_retain_last: None,
3151            snapshot_expiration_clear_expired_files: true,
3152            snapshot_expiration_clear_expired_meta_data: true,
3153            max_snapshots_num_before_compaction: None,
3154            small_files_threshold_mb: None,
3155            delete_files_count_threshold: None,
3156            trigger_snapshot_count: None,
3157            target_file_size_mb: None,
3158            compaction_type: None,
3159        };
3160
3161        assert_eq!(iceberg_config, expected_iceberg_config);
3162
3163        assert_eq!(
3164            &iceberg_config.full_table_name().unwrap().to_string(),
3165            "demo_db.demo_table"
3166        );
3167    }
3168
3169    async fn test_create_catalog(configs: BTreeMap<String, String>) {
3170        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3171
3172        let _table = iceberg_config.load_table().await.unwrap();
3173    }
3174
3175    #[tokio::test]
3176    #[ignore]
3177    async fn test_storage_catalog() {
3178        let values = [
3179            ("connector", "iceberg"),
3180            ("type", "append-only"),
3181            ("force_append_only", "true"),
3182            ("s3.endpoint", "http://127.0.0.1:9301"),
3183            ("s3.access.key", "hummockadmin"),
3184            ("s3.secret.key", "hummockadmin"),
3185            ("s3.region", "us-east-1"),
3186            ("s3.path.style.access", "true"),
3187            ("catalog.name", "demo"),
3188            ("catalog.type", "storage"),
3189            ("warehouse.path", "s3://icebergdata/demo"),
3190            ("database.name", "s1"),
3191            ("table.name", "t1"),
3192        ]
3193        .into_iter()
3194        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3195        .collect();
3196
3197        test_create_catalog(values).await;
3198    }
3199
3200    #[tokio::test]
3201    #[ignore]
3202    async fn test_rest_catalog() {
3203        let values = [
3204            ("connector", "iceberg"),
3205            ("type", "append-only"),
3206            ("force_append_only", "true"),
3207            ("s3.endpoint", "http://127.0.0.1:9301"),
3208            ("s3.access.key", "hummockadmin"),
3209            ("s3.secret.key", "hummockadmin"),
3210            ("s3.region", "us-east-1"),
3211            ("s3.path.style.access", "true"),
3212            ("catalog.name", "demo"),
3213            ("catalog.type", "rest"),
3214            ("catalog.uri", "http://192.168.167.4:8181"),
3215            ("warehouse.path", "s3://icebergdata/demo"),
3216            ("database.name", "s1"),
3217            ("table.name", "t1"),
3218        ]
3219        .into_iter()
3220        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3221        .collect();
3222
3223        test_create_catalog(values).await;
3224    }
3225
3226    #[tokio::test]
3227    #[ignore]
3228    async fn test_jdbc_catalog() {
3229        let values = [
3230            ("connector", "iceberg"),
3231            ("type", "append-only"),
3232            ("force_append_only", "true"),
3233            ("s3.endpoint", "http://127.0.0.1:9301"),
3234            ("s3.access.key", "hummockadmin"),
3235            ("s3.secret.key", "hummockadmin"),
3236            ("s3.region", "us-east-1"),
3237            ("s3.path.style.access", "true"),
3238            ("catalog.name", "demo"),
3239            ("catalog.type", "jdbc"),
3240            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3241            ("catalog.jdbc.user", "admin"),
3242            ("catalog.jdbc.password", "123456"),
3243            ("warehouse.path", "s3://icebergdata/demo"),
3244            ("database.name", "s1"),
3245            ("table.name", "t1"),
3246        ]
3247        .into_iter()
3248        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3249        .collect();
3250
3251        test_create_catalog(values).await;
3252    }
3253
3254    #[tokio::test]
3255    #[ignore]
3256    async fn test_hive_catalog() {
3257        let values = [
3258            ("connector", "iceberg"),
3259            ("type", "append-only"),
3260            ("force_append_only", "true"),
3261            ("s3.endpoint", "http://127.0.0.1:9301"),
3262            ("s3.access.key", "hummockadmin"),
3263            ("s3.secret.key", "hummockadmin"),
3264            ("s3.region", "us-east-1"),
3265            ("s3.path.style.access", "true"),
3266            ("catalog.name", "demo"),
3267            ("catalog.type", "hive"),
3268            ("catalog.uri", "thrift://localhost:9083"),
3269            ("warehouse.path", "s3://icebergdata/demo"),
3270            ("database.name", "s1"),
3271            ("table.name", "t1"),
3272        ]
3273        .into_iter()
3274        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3275        .collect();
3276
3277        test_create_catalog(values).await;
3278    }
3279
3280    /// Test parsing Google authentication configuration with custom scopes.
3281    #[test]
3282    fn test_parse_google_auth_with_custom_scopes() {
3283        let values: BTreeMap<String, String> = [
3284            ("connector", "iceberg"),
3285            ("type", "append-only"),
3286            ("force_append_only", "true"),
3287            ("catalog.name", "biglake-catalog"),
3288            ("catalog.type", "rest"),
3289            (
3290                "catalog.uri",
3291                "https://biglake.googleapis.com/iceberg/v1/restcatalog",
3292            ),
3293            ("warehouse.path", "bq://projects/my-gcp-project"),
3294            ("catalog.security", "google"),
3295            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3296            ("database.name", "my_dataset"),
3297            ("table.name", "my_table"),
3298        ]
3299        .into_iter()
3300        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3301        .collect();
3302
3303        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3304
3305        // Verify catalog type
3306        assert_eq!(iceberg_config.catalog_type(), "rest");
3307
3308        // Verify Google-specific options
3309        assert_eq!(
3310            iceberg_config.common.catalog_security.as_deref(),
3311            Some("google")
3312        );
3313        assert_eq!(
3314            iceberg_config.common.gcp_auth_scopes.as_deref(),
3315            Some(
3316                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3317            )
3318        );
3319
3320        // Verify warehouse path with bq:// prefix
3321        assert_eq!(
3322            iceberg_config.common.warehouse_path.as_deref(),
3323            Some("bq://projects/my-gcp-project")
3324        );
3325    }
3326
3327    /// Test parsing BigLake/Google Cloud REST catalog configuration.
3328    #[test]
3329    fn test_parse_biglake_google_auth_config() {
3330        let values: BTreeMap<String, String> = [
3331            ("connector", "iceberg"),
3332            ("type", "append-only"),
3333            ("force_append_only", "true"),
3334            ("catalog.name", "biglake-catalog"),
3335            ("catalog.type", "rest"),
3336            (
3337                "catalog.uri",
3338                "https://biglake.googleapis.com/iceberg/v1/restcatalog",
3339            ),
3340            ("warehouse.path", "bq://projects/my-gcp-project"),
3341            (
3342                "catalog.header",
3343                "x-goog-user-project=my-gcp-project",
3344            ),
3345            ("catalog.security", "google"),
3346            ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3347            ("database.name", "my_dataset"),
3348            ("table.name", "my_table"),
3349        ]
3350        .into_iter()
3351        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3352        .collect();
3353
3354        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3355
3356        // Verify catalog type
3357        assert_eq!(iceberg_config.catalog_type(), "rest");
3358
3359        // Verify Google-specific options
3360        assert_eq!(
3361            iceberg_config.common.catalog_security.as_deref(),
3362            Some("google")
3363        );
3364        assert_eq!(
3365            iceberg_config.common.gcp_auth_scopes.as_deref(),
3366            Some(
3367                "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3368            )
3369        );
3370
3371        // Verify warehouse path with bq:// prefix
3372        assert_eq!(
3373            iceberg_config.common.warehouse_path.as_deref(),
3374            Some("bq://projects/my-gcp-project")
3375        );
3376
3377        // Verify custom header
3378        assert_eq!(
3379            iceberg_config.common.catalog_header.as_deref(),
3380            Some("x-goog-user-project=my-gcp-project")
3381        );
3382    }
3383
3384    /// Test parsing `oauth2` security configuration.
3385    #[test]
3386    fn test_parse_oauth2_security_config() {
3387        let values: BTreeMap<String, String> = [
3388            ("connector", "iceberg"),
3389            ("type", "append-only"),
3390            ("force_append_only", "true"),
3391            ("catalog.name", "oauth2-catalog"),
3392            ("catalog.type", "rest"),
3393            ("catalog.uri", "https://example.com/iceberg/rest"),
3394            ("warehouse.path", "s3://my-bucket/warehouse"),
3395            ("catalog.security", "oauth2"),
3396            ("catalog.credential", "client_id:client_secret"),
3397            ("catalog.token", "bearer-token"),
3398            (
3399                "catalog.oauth2_server_uri",
3400                "https://oauth.example.com/token",
3401            ),
3402            ("catalog.scope", "read write"),
3403            ("database.name", "test_db"),
3404            ("table.name", "test_table"),
3405        ]
3406        .into_iter()
3407        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3408        .collect();
3409
3410        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3411
3412        // Verify catalog type
3413        assert_eq!(iceberg_config.catalog_type(), "rest");
3414
3415        // Verify OAuth2-specific options
3416        assert_eq!(
3417            iceberg_config.common.catalog_security.as_deref(),
3418            Some("oauth2")
3419        );
3420        assert_eq!(
3421            iceberg_config.common.catalog_credential.as_deref(),
3422            Some("client_id:client_secret")
3423        );
3424        assert_eq!(
3425            iceberg_config.common.catalog_token.as_deref(),
3426            Some("bearer-token")
3427        );
3428        assert_eq!(
3429            iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3430            Some("https://oauth.example.com/token")
3431        );
3432        assert_eq!(
3433            iceberg_config.common.catalog_scope.as_deref(),
3434            Some("read write")
3435        );
3436    }
3437
3438    /// Test parsing invalid security configuration.
3439    #[test]
3440    fn test_parse_invalid_security_config() {
3441        let values: BTreeMap<String, String> = [
3442            ("connector", "iceberg"),
3443            ("type", "append-only"),
3444            ("force_append_only", "true"),
3445            ("catalog.name", "invalid-catalog"),
3446            ("catalog.type", "rest"),
3447            ("catalog.uri", "https://example.com/iceberg/rest"),
3448            ("warehouse.path", "s3://my-bucket/warehouse"),
3449            ("catalog.security", "invalid_security_type"),
3450            ("database.name", "test_db"),
3451            ("table.name", "test_table"),
3452        ]
3453        .into_iter()
3454        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3455        .collect();
3456
3457        // This should still parse successfully, but with a warning for unknown security type
3458        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3459
3460        // Verify that the invalid security type is still stored
3461        assert_eq!(
3462            iceberg_config.common.catalog_security.as_deref(),
3463            Some("invalid_security_type")
3464        );
3465
3466        // Verify catalog type
3467        assert_eq!(iceberg_config.catalog_type(), "rest");
3468    }
3469
3470    /// Test parsing custom `FileIO` implementation configuration.
3471    #[test]
3472    fn test_parse_custom_io_impl_config() {
3473        let values: BTreeMap<String, String> = [
3474            ("connector", "iceberg"),
3475            ("type", "append-only"),
3476            ("force_append_only", "true"),
3477            ("catalog.name", "gcs-catalog"),
3478            ("catalog.type", "rest"),
3479            ("catalog.uri", "https://example.com/iceberg/rest"),
3480            ("warehouse.path", "gs://my-bucket/warehouse"),
3481            ("catalog.security", "google"),
3482            ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3483            ("database.name", "test_db"),
3484            ("table.name", "test_table"),
3485        ]
3486        .into_iter()
3487        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3488        .collect();
3489
3490        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3491
3492        // Verify catalog type
3493        assert_eq!(iceberg_config.catalog_type(), "rest");
3494
3495        // Verify custom `FileIO` implementation
3496        assert_eq!(
3497            iceberg_config.common.catalog_io_impl.as_deref(),
3498            Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3499        );
3500
3501        // Verify Google security is set
3502        assert_eq!(
3503            iceberg_config.common.catalog_security.as_deref(),
3504            Some("google")
3505        );
3506    }
3507
3508    #[test]
3509    fn test_config_constants_consistency() {
3510        // This test ensures our constants match the expected configuration names
3511        // If you change a constant, this test will remind you to update both places
3512        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3513        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3514        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3515        assert_eq!(WRITE_MODE, "write_mode");
3516        assert_eq!(
3517            SNAPSHOT_EXPIRATION_RETAIN_LAST,
3518            "snapshot_expiration_retain_last"
3519        );
3520        assert_eq!(
3521            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3522            "snapshot_expiration_max_age_millis"
3523        );
3524        assert_eq!(
3525            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3526            "snapshot_expiration_clear_expired_files"
3527        );
3528        assert_eq!(
3529            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3530            "snapshot_expiration_clear_expired_meta_data"
3531        );
3532        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3533    }
3534
3535    #[test]
3536    fn test_parse_iceberg_compaction_config() {
3537        // Test parsing with new compaction.* prefix config names
3538        let values = [
3539            ("connector", "iceberg"),
3540            ("type", "upsert"),
3541            ("primary_key", "id"),
3542            ("warehouse.path", "s3://iceberg"),
3543            ("s3.endpoint", "http://127.0.0.1:9301"),
3544            ("s3.access.key", "test"),
3545            ("s3.secret.key", "test"),
3546            ("s3.region", "us-east-1"),
3547            ("catalog.type", "storage"),
3548            ("catalog.name", "demo"),
3549            ("database.name", "test_db"),
3550            ("table.name", "test_table"),
3551            ("enable_compaction", "true"),
3552            ("compaction.max_snapshots_num", "100"),
3553            ("compaction.small_files_threshold_mb", "512"),
3554            ("compaction.delete_files_count_threshold", "50"),
3555            ("compaction.trigger_snapshot_count", "10"),
3556            ("compaction.target_file_size_mb", "256"),
3557            ("compaction.type", "full"),
3558        ]
3559        .into_iter()
3560        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3561        .collect();
3562
3563        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3564
3565        // Verify all compaction config fields are parsed correctly
3566        assert!(iceberg_config.enable_compaction);
3567        assert_eq!(
3568            iceberg_config.max_snapshots_num_before_compaction,
3569            Some(100)
3570        );
3571        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3572        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3573        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3574        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3575        assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3576    }
3577
3578    #[test]
3579    fn test_append_only_rejects_copy_on_write() {
3580        // Test that append-only sinks reject copy-on-write mode
3581        let values = [
3582            ("connector", "iceberg"),
3583            ("type", "append-only"),
3584            ("warehouse.path", "s3://iceberg"),
3585            ("s3.endpoint", "http://127.0.0.1:9301"),
3586            ("s3.access.key", "test"),
3587            ("s3.secret.key", "test"),
3588            ("s3.region", "us-east-1"),
3589            ("catalog.type", "storage"),
3590            ("catalog.name", "demo"),
3591            ("database.name", "test_db"),
3592            ("table.name", "test_table"),
3593            ("write_mode", "copy-on-write"),
3594        ]
3595        .into_iter()
3596        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3597        .collect();
3598
3599        let result = IcebergConfig::from_btreemap(values);
3600        assert!(result.is_err());
3601        assert!(
3602            result
3603                .unwrap_err()
3604                .to_string()
3605                .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3606        );
3607    }
3608
3609    #[test]
3610    fn test_append_only_accepts_merge_on_read() {
3611        // Test that append-only sinks accept merge-on-read mode (explicit)
3612        let values = [
3613            ("connector", "iceberg"),
3614            ("type", "append-only"),
3615            ("warehouse.path", "s3://iceberg"),
3616            ("s3.endpoint", "http://127.0.0.1:9301"),
3617            ("s3.access.key", "test"),
3618            ("s3.secret.key", "test"),
3619            ("s3.region", "us-east-1"),
3620            ("catalog.type", "storage"),
3621            ("catalog.name", "demo"),
3622            ("database.name", "test_db"),
3623            ("table.name", "test_table"),
3624            ("write_mode", "merge-on-read"),
3625        ]
3626        .into_iter()
3627        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3628        .collect();
3629
3630        let result = IcebergConfig::from_btreemap(values);
3631        assert!(result.is_ok());
3632        let config = result.unwrap();
3633        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3634    }
3635
3636    #[test]
3637    fn test_append_only_defaults_to_merge_on_read() {
3638        // Test that append-only sinks default to merge-on-read when write_mode is not specified
3639        let values = [
3640            ("connector", "iceberg"),
3641            ("type", "append-only"),
3642            ("warehouse.path", "s3://iceberg"),
3643            ("s3.endpoint", "http://127.0.0.1:9301"),
3644            ("s3.access.key", "test"),
3645            ("s3.secret.key", "test"),
3646            ("s3.region", "us-east-1"),
3647            ("catalog.type", "storage"),
3648            ("catalog.name", "demo"),
3649            ("database.name", "test_db"),
3650            ("table.name", "test_table"),
3651        ]
3652        .into_iter()
3653        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3654        .collect();
3655
3656        let result = IcebergConfig::from_btreemap(values);
3657        assert!(result.is_ok());
3658        let config = result.unwrap();
3659        assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3660    }
3661
3662    #[test]
3663    fn test_upsert_accepts_copy_on_write() {
3664        // Test that upsert sinks accept copy-on-write mode
3665        let values = [
3666            ("connector", "iceberg"),
3667            ("type", "upsert"),
3668            ("primary_key", "id"),
3669            ("warehouse.path", "s3://iceberg"),
3670            ("s3.endpoint", "http://127.0.0.1:9301"),
3671            ("s3.access.key", "test"),
3672            ("s3.secret.key", "test"),
3673            ("s3.region", "us-east-1"),
3674            ("catalog.type", "storage"),
3675            ("catalog.name", "demo"),
3676            ("database.name", "test_db"),
3677            ("table.name", "test_table"),
3678            ("write_mode", "copy-on-write"),
3679        ]
3680        .into_iter()
3681        .map(|(k, v)| (k.to_owned(), v.to_owned()))
3682        .collect();
3683
3684        let result = IcebergConfig::from_btreemap(values);
3685        assert!(result.is_ok());
3686        let config = result.unwrap();
3687        assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3688    }
3689}