risingwave_connector/sink/iceberg/
config.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49    #[default]
50    MergeOnRead,
51    CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55    pub fn as_str(self) -> &'static str {
56        match self {
57            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59        }
60    }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64    type Err = SinkError;
65
66    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67        match s {
68            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70            _ => Err(SinkError::Config(anyhow!(format!(
71                "invalid write_mode: {}, must be one of: {}, {}",
72                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73            )))),
74        }
75    }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79    type Error = <Self as std::str::FromStr>::Err;
80
81    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82        value.parse()
83    }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87    type Error = <Self as std::str::FromStr>::Err;
88
89    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90        value.as_str().parse()
91    }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.write_str(self.as_str())
97    }
98}
99
100// Configuration constants
101pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110    "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125    "compaction.write_parquet_max_row_group_rows";
126pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
127pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
128
129pub(super) const PARQUET_CREATED_BY: &str =
130    concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
131
132fn default_commit_retry_num() -> u32 {
133    8
134}
135
136fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
137    Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
138}
139
140fn default_iceberg_write_mode() -> IcebergWriteMode {
141    IcebergWriteMode::MergeOnRead
142}
143
144fn default_iceberg_format_version() -> FormatVersion {
145    FormatVersion::V2
146}
147
148fn default_true() -> bool {
149    true
150}
151
152fn default_some_true() -> Option<bool> {
153    Some(true)
154}
155
156fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
157    let parsed = value
158        .trim()
159        .parse::<u8>()
160        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
161    match parsed {
162        1 => Ok(FormatVersion::V1),
163        2 => Ok(FormatVersion::V2),
164        3 => Ok(FormatVersion::V3),
165        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
166    }
167}
168
169fn deserialize_format_version<'de, D>(
170    deserializer: D,
171) -> std::result::Result<FormatVersion, D::Error>
172where
173    D: Deserializer<'de>,
174{
175    struct FormatVersionVisitor;
176
177    impl<'de> Visitor<'de> for FormatVersionVisitor {
178        type Value = FormatVersion;
179
180        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181            formatter.write_str("format-version as 1, 2, or 3")
182        }
183
184        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
185        where
186            E: de::Error,
187        {
188            let value = u8::try_from(value)
189                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
190            parse_format_version_str(&value.to_string()).map_err(E::custom)
191        }
192
193        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
194        where
195            E: de::Error,
196        {
197            let value = u8::try_from(value)
198                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
199            parse_format_version_str(&value.to_string()).map_err(E::custom)
200        }
201
202        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
203        where
204            E: de::Error,
205        {
206            parse_format_version_str(value).map_err(E::custom)
207        }
208
209        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
210        where
211            E: de::Error,
212        {
213            self.visit_str(&value)
214        }
215    }
216
217    deserializer.deserialize_any(FormatVersionVisitor)
218}
219
220/// Compaction type for Iceberg sink
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
222#[serde(rename_all = "kebab-case")]
223pub enum CompactionType {
224    /// Full compaction - rewrites all data files
225    #[default]
226    Full,
227    /// Small files compaction - only compact small files
228    SmallFiles,
229    /// Files with delete compaction - only compact files that have associated delete files
230    FilesWithDelete,
231}
232
233impl CompactionType {
234    pub fn as_str(&self) -> &'static str {
235        match self {
236            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
237            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
239        }
240    }
241}
242
243impl std::str::FromStr for CompactionType {
244    type Err = SinkError;
245
246    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
247        match s {
248            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
249            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
250            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
251            _ => Err(SinkError::Config(anyhow!(format!(
252                "invalid compaction_type: {}, must be one of: {}, {}, {}",
253                s,
254                ICEBERG_COMPACTION_TYPE_FULL,
255                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
256                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
257            )))),
258        }
259    }
260}
261
262impl TryFrom<&str> for CompactionType {
263    type Error = <Self as std::str::FromStr>::Err;
264
265    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
266        value.parse()
267    }
268}
269
270impl TryFrom<String> for CompactionType {
271    type Error = <Self as std::str::FromStr>::Err;
272
273    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
274        value.as_str().parse()
275    }
276}
277
278impl std::fmt::Display for CompactionType {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "{}", self.as_str())
281    }
282}
283
284#[serde_as]
285#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
286pub struct IcebergConfig {
287    pub r#type: String, // accept "append-only" or "upsert"
288
289    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
290    pub force_append_only: bool,
291
292    #[serde(flatten)]
293    pub(crate) common: IcebergCommon,
294
295    #[serde(flatten)]
296    pub(crate) table: IcebergTableIdentifier,
297
298    #[serde(
299        rename = "primary_key",
300        default,
301        deserialize_with = "deserialize_optional_string_seq_from_string"
302    )]
303    pub primary_key: Option<Vec<String>>,
304
305    // Props for java catalog props.
306    #[serde(skip)]
307    pub java_catalog_props: HashMap<String, String>,
308
309    #[serde(default)]
310    pub partition_by: Option<String>,
311
312    /// Commit every n(>0) checkpoints, default is 60.
313    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
314    #[serde_as(as = "DisplayFromStr")]
315    #[with_option(allow_alter_on_fly)]
316    pub commit_checkpoint_interval: u64,
317
318    /// Commit on the next checkpoint barrier after buffered write size exceeds this threshold.
319    /// Default is 128 MB.
320    #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
321    #[serde_as(as = "Option<DisplayFromStr>")]
322    #[with_option(allow_alter_on_fly)]
323    pub commit_checkpoint_size_threshold_mb: Option<u64>,
324
325    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
326    pub create_table_if_not_exists: bool,
327
328    /// Whether it is `exactly_once`, the default is true.
329    #[serde(default = "default_some_true")]
330    #[serde_as(as = "Option<DisplayFromStr>")]
331    pub is_exactly_once: Option<bool>,
332    // Retry commit num when iceberg commit fail. default is 8.
333    // # TODO
334    // Iceberg table may store the retry commit num in table meta.
335    // We should try to find and use that as default commit retry num first.
336    #[serde(default = "default_commit_retry_num")]
337    pub commit_retry_num: u32,
338
339    /// Whether to enable iceberg compaction.
340    #[serde(
341        rename = "enable_compaction",
342        default,
343        deserialize_with = "deserialize_bool_from_string"
344    )]
345    #[with_option(allow_alter_on_fly)]
346    pub enable_compaction: bool,
347
348    /// The interval of iceberg compaction
349    #[serde(rename = "compaction_interval_sec", default)]
350    #[serde_as(as = "Option<DisplayFromStr>")]
351    #[with_option(allow_alter_on_fly)]
352    pub compaction_interval_sec: Option<u64>,
353
354    /// Whether to enable iceberg expired snapshots.
355    #[serde(
356        rename = "enable_snapshot_expiration",
357        default,
358        deserialize_with = "deserialize_bool_from_string"
359    )]
360    #[with_option(allow_alter_on_fly)]
361    pub enable_snapshot_expiration: bool,
362
363    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
364    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
365    pub write_mode: IcebergWriteMode,
366
367    /// Iceberg format version for table creation.
368    #[serde(
369        rename = "format_version",
370        default = "default_iceberg_format_version",
371        deserialize_with = "deserialize_format_version"
372    )]
373    pub format_version: FormatVersion,
374
375    /// The maximum age (in milliseconds) for snapshots before they expire
376    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
377    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
378    #[serde_as(as = "Option<DisplayFromStr>")]
379    #[with_option(allow_alter_on_fly)]
380    pub snapshot_expiration_max_age_millis: Option<i64>,
381
382    /// The number of snapshots to retain
383    #[serde(rename = "snapshot_expiration_retain_last", default)]
384    #[serde_as(as = "Option<DisplayFromStr>")]
385    #[with_option(allow_alter_on_fly)]
386    pub snapshot_expiration_retain_last: Option<i32>,
387
388    #[serde(
389        rename = "snapshot_expiration_clear_expired_files",
390        default = "default_true",
391        deserialize_with = "deserialize_bool_from_string"
392    )]
393    #[with_option(allow_alter_on_fly)]
394    pub snapshot_expiration_clear_expired_files: bool,
395
396    #[serde(
397        rename = "snapshot_expiration_clear_expired_meta_data",
398        default = "default_true",
399        deserialize_with = "deserialize_bool_from_string"
400    )]
401    #[with_option(allow_alter_on_fly)]
402    pub snapshot_expiration_clear_expired_meta_data: bool,
403
404    /// The maximum number of snapshots allowed since the last rewrite operation
405    /// If set, sink will check snapshot count and wait if exceeded
406    #[serde(rename = "compaction.max_snapshots_num", default)]
407    #[serde_as(as = "Option<DisplayFromStr>")]
408    #[with_option(allow_alter_on_fly)]
409    pub max_snapshots_num_before_compaction: Option<usize>,
410
411    #[serde(rename = "compaction.small_files_threshold_mb", default)]
412    #[serde_as(as = "Option<DisplayFromStr>")]
413    #[with_option(allow_alter_on_fly)]
414    pub small_files_threshold_mb: Option<u64>,
415
416    #[serde(rename = "compaction.delete_files_count_threshold", default)]
417    #[serde_as(as = "Option<DisplayFromStr>")]
418    #[with_option(allow_alter_on_fly)]
419    pub delete_files_count_threshold: Option<usize>,
420
421    #[serde(rename = "compaction.trigger_snapshot_count", default)]
422    #[serde_as(as = "Option<DisplayFromStr>")]
423    #[with_option(allow_alter_on_fly)]
424    pub trigger_snapshot_count: Option<usize>,
425
426    #[serde(rename = "compaction.target_file_size_mb", default)]
427    #[serde_as(as = "Option<DisplayFromStr>")]
428    #[with_option(allow_alter_on_fly)]
429    pub target_file_size_mb: Option<u64>,
430
431    /// Compaction type: `full`, `small-files`, or `files-with-delete`
432    /// If not set, will default to `full`
433    #[serde(rename = "compaction.type", default)]
434    #[with_option(allow_alter_on_fly)]
435    pub compaction_type: Option<CompactionType>,
436
437    /// Parquet compression codec
438    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
439    /// Default is snappy
440    #[serde(rename = "compaction.write_parquet_compression", default)]
441    #[with_option(allow_alter_on_fly)]
442    pub write_parquet_compression: Option<String>,
443
444    /// Maximum number of rows in a Parquet row group
445    /// Default is 122880 (from developer config)
446    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
447    #[serde_as(as = "Option<DisplayFromStr>")]
448    #[with_option(allow_alter_on_fly)]
449    pub write_parquet_max_row_group_rows: Option<usize>,
450}
451
452impl EnforceSecret for IcebergConfig {
453    fn enforce_secret<'a>(
454        prop_iter: impl Iterator<Item = &'a str>,
455    ) -> crate::error::ConnectorResult<()> {
456        for prop in prop_iter {
457            IcebergCommon::enforce_one(prop)?;
458        }
459        Ok(())
460    }
461
462    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
463        IcebergCommon::enforce_one(prop)
464    }
465}
466
467impl IcebergConfig {
468    /// Validate that append-only sinks use merge-on-read mode
469    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
470    pub fn validate_append_only_write_mode(
471        sink_type: &str,
472        write_mode: IcebergWriteMode,
473    ) -> Result<()> {
474        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
475            return Err(SinkError::Config(anyhow!(
476                "'copy-on-write' mode is not supported for append-only iceberg sink. \
477                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
478            )));
479        }
480        Ok(())
481    }
482
483    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
484        let mut config =
485            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
486                .map_err(|e| SinkError::Config(anyhow!(e)))?;
487
488        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
489            return Err(SinkError::Config(anyhow!(
490                "`{}` must be {}, or {}",
491                SINK_TYPE_OPTION,
492                SINK_TYPE_APPEND_ONLY,
493                SINK_TYPE_UPSERT
494            )));
495        }
496
497        if config.r#type == SINK_TYPE_UPSERT {
498            if let Some(primary_key) = &config.primary_key {
499                if primary_key.is_empty() {
500                    return Err(SinkError::Config(anyhow!(
501                        "`primary-key` must not be empty in {}",
502                        SINK_TYPE_UPSERT
503                    )));
504                }
505            } else {
506                return Err(SinkError::Config(anyhow!(
507                    "Must set `primary-key` in {}",
508                    SINK_TYPE_UPSERT
509                )));
510            }
511        }
512
513        // Enforce merge-on-read for append-only sinks
514        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
515
516        // All configs start with "catalog." will be treated as java configs.
517        config.java_catalog_props = values
518            .iter()
519            .filter(|(k, _v)| {
520                k.starts_with("catalog.")
521                    && k != &"catalog.uri"
522                    && k != &"catalog.type"
523                    && k != &"catalog.name"
524                    && k != &"catalog.header"
525            })
526            .map(|(k, v)| (k[8..].to_string(), v.clone()))
527            .collect();
528
529        if config.commit_checkpoint_interval == 0 {
530            return Err(SinkError::Config(anyhow!(
531                "`commit-checkpoint-interval` must be greater than 0"
532            )));
533        }
534
535        if config.commit_checkpoint_size_threshold_mb == Some(0) {
536            return Err(SinkError::Config(anyhow!(
537                "`commit_checkpoint_size_threshold_mb` must be greater than 0"
538            )));
539        }
540
541        // Validate table identifier (e.g., database.name should not contain dots)
542        config
543            .table
544            .validate()
545            .map_err(|e| SinkError::Config(anyhow!(e)))?;
546
547        Ok(config)
548    }
549
550    pub fn catalog_type(&self) -> &str {
551        self.common.catalog_type()
552    }
553
554    pub async fn load_table(&self) -> Result<Table> {
555        self.common
556            .load_table(&self.table, &self.java_catalog_props)
557            .await
558            .map_err(Into::into)
559    }
560
561    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
562        self.common
563            .create_catalog(&self.java_catalog_props)
564            .await
565            .map_err(Into::into)
566    }
567
568    pub fn full_table_name(&self) -> Result<TableIdent> {
569        self.table.to_table_ident().map_err(Into::into)
570    }
571
572    pub fn catalog_name(&self) -> String {
573        self.common.catalog_name()
574    }
575
576    pub fn table_format_version(&self) -> FormatVersion {
577        self.format_version
578    }
579
580    pub fn compaction_interval_sec(&self) -> u64 {
581        // default to 1 hour
582        self.compaction_interval_sec.unwrap_or(3600)
583    }
584
585    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
586    /// Returns `current_time_ms` - `max_age_millis`
587    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
588        self.snapshot_expiration_max_age_millis
589            .map(|max_age_millis| current_time_ms - max_age_millis)
590    }
591
592    pub fn trigger_snapshot_count(&self) -> usize {
593        self.trigger_snapshot_count.unwrap_or(usize::MAX)
594    }
595
596    pub fn small_files_threshold_mb(&self) -> u64 {
597        self.small_files_threshold_mb.unwrap_or(64)
598    }
599
600    pub fn delete_files_count_threshold(&self) -> usize {
601        self.delete_files_count_threshold.unwrap_or(256)
602    }
603
604    pub fn target_file_size_mb(&self) -> u64 {
605        self.target_file_size_mb.unwrap_or(1024)
606    }
607
608    pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
609        self.commit_checkpoint_size_threshold_mb
610            .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
611    }
612
613    /// Get the compaction type as an enum
614    /// This method parses the string and returns the enum value
615    pub fn compaction_type(&self) -> CompactionType {
616        self.compaction_type.unwrap_or_default()
617    }
618
619    /// Get the parquet compression codec
620    /// Default is "zstd"
621    pub fn write_parquet_compression(&self) -> &str {
622        self.write_parquet_compression.as_deref().unwrap_or("zstd")
623    }
624
625    /// Get the maximum number of rows in a Parquet row group
626    /// Default is 122880 (from developer config default)
627    pub fn write_parquet_max_row_group_rows(&self) -> usize {
628        self.write_parquet_max_row_group_rows.unwrap_or(122880)
629    }
630
631    /// Parse the compression codec string into Parquet Compression enum
632    /// Returns SNAPPY as default if parsing fails or not specified
633    pub fn get_parquet_compression(&self) -> Compression {
634        parse_parquet_compression(self.write_parquet_compression())
635    }
636}
637
638/// Parse compression codec string to Parquet Compression enum
639pub fn parse_parquet_compression(codec: &str) -> Compression {
640    match codec.to_lowercase().as_str() {
641        "uncompressed" => Compression::UNCOMPRESSED,
642        "snappy" => Compression::SNAPPY,
643        "gzip" => Compression::GZIP(Default::default()),
644        "lzo" => Compression::LZO,
645        "brotli" => Compression::BROTLI(Default::default()),
646        "lz4" => Compression::LZ4,
647        "zstd" => Compression::ZSTD(Default::default()),
648        _ => {
649            tracing::warn!(
650                "Unknown compression codec '{}', falling back to SNAPPY",
651                codec
652            );
653            Compression::SNAPPY
654        }
655    }
656}
657
658// Helper Functions
659
660pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
661    if should_enable_iceberg_cow(sink_type, write_mode) {
662        ICEBERG_COW_BRANCH.to_owned()
663    } else {
664        MAIN_BRANCH.to_owned()
665    }
666}
667
668pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
669    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
670}
671
672impl crate::with_options::WithOptions for IcebergWriteMode {}
673
674impl crate::with_options::WithOptions for FormatVersion {}
675
676impl crate::with_options::WithOptions for CompactionType {}