risingwave_connector/sink/iceberg/
config.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49    #[default]
50    MergeOnRead,
51    CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55    pub fn as_str(self) -> &'static str {
56        match self {
57            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59        }
60    }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64    type Err = SinkError;
65
66    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67        match s {
68            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70            _ => Err(SinkError::Config(anyhow!(format!(
71                "invalid write_mode: {}, must be one of: {}, {}",
72                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73            )))),
74        }
75    }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79    type Error = <Self as std::str::FromStr>::Err;
80
81    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82        value.parse()
83    }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87    type Error = <Self as std::str::FromStr>::Err;
88
89    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90        value.as_str().parse()
91    }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.write_str(self.as_str())
97    }
98}
99
100// Configuration constants
101pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110    "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125    "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127    "compaction.write_parquet_max_row_group_bytes";
128pub const ORDER_KEY: &str = "order_key";
129pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
130pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
131
132pub(super) const PARQUET_CREATED_BY: &str =
133    concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
134
135fn default_commit_retry_num() -> u32 {
136    8
137}
138
139fn default_iceberg_write_mode() -> IcebergWriteMode {
140    IcebergWriteMode::MergeOnRead
141}
142
143fn default_iceberg_format_version() -> FormatVersion {
144    FormatVersion::V2
145}
146
147fn default_true() -> bool {
148    true
149}
150
151fn default_some_true() -> Option<bool> {
152    Some(true)
153}
154
155fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
156    let parsed = value
157        .trim()
158        .parse::<u8>()
159        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
160    match parsed {
161        1 => Ok(FormatVersion::V1),
162        2 => Ok(FormatVersion::V2),
163        3 => Ok(FormatVersion::V3),
164        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
165    }
166}
167
168fn deserialize_format_version<'de, D>(
169    deserializer: D,
170) -> std::result::Result<FormatVersion, D::Error>
171where
172    D: Deserializer<'de>,
173{
174    struct FormatVersionVisitor;
175
176    impl<'de> Visitor<'de> for FormatVersionVisitor {
177        type Value = FormatVersion;
178
179        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180            formatter.write_str("format-version as 1, 2, or 3")
181        }
182
183        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
184        where
185            E: de::Error,
186        {
187            let value = u8::try_from(value)
188                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
189            parse_format_version_str(&value.to_string()).map_err(E::custom)
190        }
191
192        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
193        where
194            E: de::Error,
195        {
196            let value = u8::try_from(value)
197                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
198            parse_format_version_str(&value.to_string()).map_err(E::custom)
199        }
200
201        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
202        where
203            E: de::Error,
204        {
205            parse_format_version_str(value).map_err(E::custom)
206        }
207
208        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
209        where
210            E: de::Error,
211        {
212            self.visit_str(&value)
213        }
214    }
215
216    deserializer.deserialize_any(FormatVersionVisitor)
217}
218
219/// Compaction type for Iceberg sink
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
221#[serde(rename_all = "kebab-case")]
222pub enum CompactionType {
223    /// Full compaction - rewrites all data files
224    #[default]
225    Full,
226    /// Small files compaction - only compact small files
227    SmallFiles,
228    /// Files with delete compaction - only compact files that have associated delete files
229    FilesWithDelete,
230}
231
232impl CompactionType {
233    pub fn as_str(&self) -> &'static str {
234        match self {
235            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
236            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
237            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
238        }
239    }
240}
241
242impl std::str::FromStr for CompactionType {
243    type Err = SinkError;
244
245    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
246        match s {
247            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
248            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
249            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
250            _ => Err(SinkError::Config(anyhow!(format!(
251                "invalid compaction_type: {}, must be one of: {}, {}, {}",
252                s,
253                ICEBERG_COMPACTION_TYPE_FULL,
254                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
255                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
256            )))),
257        }
258    }
259}
260
261impl TryFrom<&str> for CompactionType {
262    type Error = <Self as std::str::FromStr>::Err;
263
264    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
265        value.parse()
266    }
267}
268
269impl TryFrom<String> for CompactionType {
270    type Error = <Self as std::str::FromStr>::Err;
271
272    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
273        value.as_str().parse()
274    }
275}
276
277impl std::fmt::Display for CompactionType {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "{}", self.as_str())
280    }
281}
282
283#[serde_as]
284#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
285pub struct IcebergConfig {
286    pub r#type: String, // accept "append-only" or "upsert"
287
288    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
289    pub force_append_only: bool,
290
291    #[serde(flatten)]
292    pub(crate) common: IcebergCommon,
293
294    #[serde(flatten)]
295    pub(crate) table: IcebergTableIdentifier,
296
297    #[serde(
298        rename = "primary_key",
299        default,
300        deserialize_with = "deserialize_optional_string_seq_from_string"
301    )]
302    pub primary_key: Option<Vec<String>>,
303
304    // Props for java catalog props.
305    #[serde(skip)]
306    pub java_catalog_props: HashMap<String, String>,
307
308    #[serde(default)]
309    pub partition_by: Option<String>,
310
311    #[serde(default)]
312    pub order_key: Option<String>,
313
314    /// Commit every n(>0) checkpoints, default is 60.
315    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
316    #[serde_as(as = "DisplayFromStr")]
317    #[with_option(allow_alter_on_fly)]
318    pub commit_checkpoint_interval: u64,
319
320    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
321    pub create_table_if_not_exists: bool,
322
323    /// Whether it is `exactly_once`, the default is true.
324    #[serde(default = "default_some_true")]
325    #[serde_as(as = "Option<DisplayFromStr>")]
326    pub is_exactly_once: Option<bool>,
327    // Retry commit num when iceberg commit fail. default is 8.
328    // # TODO
329    // Iceberg table may store the retry commit num in table meta.
330    // We should try to find and use that as default commit retry num first.
331    #[serde(default = "default_commit_retry_num")]
332    pub commit_retry_num: u32,
333
334    /// Whether to enable iceberg compaction.
335    #[serde(
336        rename = "enable_compaction",
337        default,
338        deserialize_with = "deserialize_bool_from_string"
339    )]
340    #[with_option(allow_alter_on_fly)]
341    pub enable_compaction: bool,
342
343    /// The interval of iceberg compaction
344    #[serde(rename = "compaction_interval_sec", default)]
345    #[serde_as(as = "Option<DisplayFromStr>")]
346    #[with_option(allow_alter_on_fly)]
347    pub compaction_interval_sec: Option<u64>,
348
349    /// Whether to enable iceberg expired snapshots.
350    #[serde(
351        rename = "enable_snapshot_expiration",
352        default = "default_true",
353        deserialize_with = "deserialize_bool_from_string"
354    )]
355    #[with_option(allow_alter_on_fly)]
356    pub enable_snapshot_expiration: bool,
357
358    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
359    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
360    pub write_mode: IcebergWriteMode,
361
362    /// Iceberg format version for table creation.
363    #[serde(
364        rename = "format_version",
365        default = "default_iceberg_format_version",
366        deserialize_with = "deserialize_format_version"
367    )]
368    pub format_version: FormatVersion,
369
370    /// The maximum age (in milliseconds) for snapshots before they expire
371    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
372    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
373    #[serde_as(as = "Option<DisplayFromStr>")]
374    #[with_option(allow_alter_on_fly)]
375    pub snapshot_expiration_max_age_millis: Option<i64>,
376
377    /// The number of snapshots to retain
378    #[serde(rename = "snapshot_expiration_retain_last", default)]
379    #[serde_as(as = "Option<DisplayFromStr>")]
380    #[with_option(allow_alter_on_fly)]
381    pub snapshot_expiration_retain_last: Option<i32>,
382
383    #[serde(
384        rename = "snapshot_expiration_clear_expired_files",
385        default = "default_true",
386        deserialize_with = "deserialize_bool_from_string"
387    )]
388    #[with_option(allow_alter_on_fly)]
389    pub snapshot_expiration_clear_expired_files: bool,
390
391    #[serde(
392        rename = "snapshot_expiration_clear_expired_meta_data",
393        default = "default_true",
394        deserialize_with = "deserialize_bool_from_string"
395    )]
396    #[with_option(allow_alter_on_fly)]
397    pub snapshot_expiration_clear_expired_meta_data: bool,
398
399    /// The maximum number of snapshots allowed since the last rewrite operation
400    /// If set, sink will check snapshot count and wait if exceeded
401    #[serde(rename = "compaction.max_snapshots_num", default)]
402    #[serde_as(as = "Option<DisplayFromStr>")]
403    #[with_option(allow_alter_on_fly)]
404    pub max_snapshots_num_before_compaction: Option<usize>,
405
406    #[serde(rename = "compaction.small_files_threshold_mb", default)]
407    #[serde_as(as = "Option<DisplayFromStr>")]
408    #[with_option(allow_alter_on_fly)]
409    pub small_files_threshold_mb: Option<u64>,
410
411    #[serde(rename = "compaction.delete_files_count_threshold", default)]
412    #[serde_as(as = "Option<DisplayFromStr>")]
413    #[with_option(allow_alter_on_fly)]
414    pub delete_files_count_threshold: Option<usize>,
415
416    #[serde(rename = "compaction.trigger_snapshot_count", default)]
417    #[serde_as(as = "Option<DisplayFromStr>")]
418    #[with_option(allow_alter_on_fly)]
419    pub trigger_snapshot_count: Option<usize>,
420
421    #[serde(rename = "compaction.target_file_size_mb", default)]
422    #[serde_as(as = "Option<DisplayFromStr>")]
423    #[with_option(allow_alter_on_fly)]
424    pub target_file_size_mb: Option<u64>,
425
426    /// Compaction type: `full`, `small-files`, or `files-with-delete`
427    /// If not set, will default to `full`
428    #[serde(rename = "compaction.type", default)]
429    #[with_option(allow_alter_on_fly)]
430    pub compaction_type: Option<CompactionType>,
431
432    /// Parquet compression codec
433    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
434    /// Default is zstd
435    #[serde(rename = "compaction.write_parquet_compression", default)]
436    #[with_option(allow_alter_on_fly)]
437    pub write_parquet_compression: Option<String>,
438
439    /// Deprecated: maximum number of rows in a Parquet row group.
440    /// Accepted for backward compatibility, but ignored by the writer.
441    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
442    #[serde_as(as = "Option<DisplayFromStr>")]
443    #[with_option(allow_alter_on_fly)]
444    pub write_parquet_max_row_group_rows: Option<usize>,
445
446    /// Maximum size of a Parquet row group in bytes
447    /// Default is 128 `MiB`, matching Iceberg defaults.
448    #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
449    #[serde_as(as = "Option<DisplayFromStr>")]
450    #[with_option(allow_alter_on_fly)]
451    pub write_parquet_max_row_group_bytes: Option<usize>,
452
453    /// Whether to enable PK index for upsert sink. Default is false.
454    /// It's used for V3 upsert iceberg sink to generate delete vectors.
455    #[serde(
456        rename = "enable_pk_index",
457        default,
458        deserialize_with = "deserialize_bool_from_string"
459    )]
460    pub enable_pk_index: bool,
461}
462
463impl EnforceSecret for IcebergConfig {
464    fn enforce_secret<'a>(
465        prop_iter: impl Iterator<Item = &'a str>,
466    ) -> crate::error::ConnectorResult<()> {
467        for prop in prop_iter {
468            IcebergCommon::enforce_one(prop)?;
469        }
470        Ok(())
471    }
472
473    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
474        IcebergCommon::enforce_one(prop)
475    }
476}
477
478impl IcebergConfig {
479    /// Validate that append-only sinks use merge-on-read mode
480    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
481    pub fn validate_append_only_write_mode(
482        sink_type: &str,
483        write_mode: IcebergWriteMode,
484    ) -> Result<()> {
485        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
486            return Err(SinkError::Config(anyhow!(
487                "'copy-on-write' mode is not supported for append-only iceberg sink. \
488                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
489            )));
490        }
491        Ok(())
492    }
493
494    pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
495        if !self.enable_pk_index {
496            return Ok(());
497        }
498
499        if self.r#type != SINK_TYPE_UPSERT {
500            return Err(SinkError::Config(anyhow!(
501                "`enable_pk_index` is only supported for upsert iceberg sink"
502            )));
503        }
504
505        if self.write_mode != IcebergWriteMode::MergeOnRead {
506            return Err(SinkError::Config(anyhow!(
507                "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
508            )));
509        }
510
511        if self.format_version < FormatVersion::V3 {
512            return Err(SinkError::Config(anyhow!(
513                "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
514            )));
515        }
516
517        if self.force_append_only {
518            return Err(SinkError::Config(anyhow!(
519                "`enable_pk_index` cannot be true when `force_append_only` is true"
520            )));
521        }
522
523        Ok(())
524    }
525
526    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
527        let mut config =
528            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
529                .map_err(|e| SinkError::Config(anyhow!(e)))?;
530
531        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
532            return Err(SinkError::Config(anyhow!(
533                "`{}` must be {}, or {}",
534                SINK_TYPE_OPTION,
535                SINK_TYPE_APPEND_ONLY,
536                SINK_TYPE_UPSERT
537            )));
538        }
539
540        if config.r#type == SINK_TYPE_UPSERT {
541            if let Some(primary_key) = &config.primary_key {
542                if primary_key.is_empty() {
543                    return Err(SinkError::Config(anyhow!(
544                        "`primary-key` must not be empty in {}",
545                        SINK_TYPE_UPSERT
546                    )));
547                }
548            } else {
549                return Err(SinkError::Config(anyhow!(
550                    "Must set `primary-key` in {}",
551                    SINK_TYPE_UPSERT
552                )));
553            }
554        }
555
556        // Enforce merge-on-read for append-only sinks
557        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
558        config.validate_enable_pk_index()?;
559
560        // All configs start with "catalog." will be treated as java configs.
561        config.java_catalog_props = values
562            .iter()
563            .filter(|(k, _v)| {
564                k.starts_with("catalog.")
565                    && k != &"catalog.uri"
566                    && k != &"catalog.type"
567                    && k != &"catalog.name"
568                    && k != &"catalog.header"
569            })
570            .map(|(k, v)| (k[8..].to_string(), v.clone()))
571            .collect();
572
573        if config.commit_checkpoint_interval == 0 {
574            return Err(SinkError::Config(anyhow!(
575                "`commit-checkpoint-interval` must be greater than 0"
576            )));
577        }
578
579        if config.trigger_snapshot_count == Some(0) {
580            return Err(SinkError::Config(anyhow!(
581                "`compaction.trigger_snapshot_count` must be greater than 0"
582            )));
583        }
584
585        // Validate table identifier (e.g., database.name should not contain dots)
586        config
587            .table
588            .validate()
589            .map_err(|e| SinkError::Config(anyhow!(e)))?;
590
591        if config.write_parquet_max_row_group_rows.is_some() {
592            tracing::warn!(
593                "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
594            );
595        }
596
597        Ok(config)
598    }
599
600    pub fn catalog_type(&self) -> &str {
601        self.common.catalog_type()
602    }
603
604    pub async fn load_table(&self) -> Result<Table> {
605        self.common
606            .load_table(&self.table, &self.java_catalog_props)
607            .await
608            .map_err(Into::into)
609    }
610
611    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
612        self.common
613            .create_catalog(&self.java_catalog_props)
614            .await
615            .map_err(Into::into)
616    }
617
618    pub fn full_table_name(&self) -> Result<TableIdent> {
619        self.table.to_table_ident().map_err(Into::into)
620    }
621
622    pub fn catalog_name(&self) -> String {
623        self.common.catalog_name()
624    }
625
626    pub fn table_format_version(&self) -> FormatVersion {
627        self.format_version
628    }
629
630    pub fn compaction_interval_sec(&self) -> u64 {
631        // default to 1 hour
632        self.compaction_interval_sec.unwrap_or(3600)
633    }
634
635    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
636    /// Returns `current_time_ms` - `max_age_millis`
637    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
638        self.snapshot_expiration_max_age_millis
639            .map(|max_age_millis| current_time_ms - max_age_millis)
640    }
641
642    pub fn trigger_snapshot_count(&self) -> usize {
643        self.trigger_snapshot_count.unwrap_or(usize::MAX)
644    }
645
646    pub fn small_files_threshold_mb(&self) -> u64 {
647        self.small_files_threshold_mb.unwrap_or(64)
648    }
649
650    pub fn delete_files_count_threshold(&self) -> usize {
651        self.delete_files_count_threshold.unwrap_or(256)
652    }
653
654    pub fn target_file_size_mb(&self) -> u64 {
655        self.target_file_size_mb.unwrap_or(1024)
656    }
657
658    /// Get the compaction type as an enum
659    /// This method parses the string and returns the enum value
660    pub fn compaction_type(&self) -> CompactionType {
661        self.compaction_type.unwrap_or_default()
662    }
663
664    /// Get the parquet compression codec
665    /// Default is "zstd"
666    pub fn write_parquet_compression(&self) -> &str {
667        self.write_parquet_compression.as_deref().unwrap_or("zstd")
668    }
669
670    /// Get the maximum number of rows in a Parquet row group.
671    pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
672        self.write_parquet_max_row_group_rows
673    }
674
675    /// Get the maximum size in bytes of a Parquet row group.
676    pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
677        self.write_parquet_max_row_group_bytes
678            .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
679    }
680
681    /// Parse the compression codec string into Parquet Compression enum.
682    /// Invalid values fall back to SNAPPY.
683    pub fn get_parquet_compression(&self) -> Compression {
684        parse_parquet_compression(self.write_parquet_compression())
685    }
686}
687
688/// Parse compression codec string to Parquet Compression enum
689pub fn parse_parquet_compression(codec: &str) -> Compression {
690    match codec.to_lowercase().as_str() {
691        "uncompressed" => Compression::UNCOMPRESSED,
692        "snappy" => Compression::SNAPPY,
693        "gzip" => Compression::GZIP(Default::default()),
694        "lzo" => Compression::LZO,
695        "brotli" => Compression::BROTLI(Default::default()),
696        "lz4" => Compression::LZ4,
697        "zstd" => Compression::ZSTD(Default::default()),
698        _ => {
699            tracing::warn!(
700                "Unknown compression codec '{}', falling back to SNAPPY",
701                codec
702            );
703            Compression::SNAPPY
704        }
705    }
706}
707
708// Helper Functions
709
710pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
711    if should_enable_iceberg_cow(sink_type, write_mode) {
712        ICEBERG_COW_BRANCH.to_owned()
713    } else {
714        MAIN_BRANCH.to_owned()
715    }
716}
717
718pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
719    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
720}
721
722impl crate::with_options::WithOptions for IcebergWriteMode {}
723
724impl crate::with_options::WithOptions for FormatVersion {}
725
726impl crate::with_options::WithOptions for CompactionType {}