risingwave_connector/sink/iceberg/
config.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49    #[default]
50    MergeOnRead,
51    CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55    pub fn as_str(self) -> &'static str {
56        match self {
57            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59        }
60    }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64    type Err = SinkError;
65
66    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67        match s {
68            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70            _ => Err(SinkError::Config(anyhow!(format!(
71                "invalid write_mode: {}, must be one of: {}, {}",
72                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73            )))),
74        }
75    }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79    type Error = <Self as std::str::FromStr>::Err;
80
81    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82        value.parse()
83    }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87    type Error = <Self as std::str::FromStr>::Err;
88
89    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90        value.as_str().parse()
91    }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.write_str(self.as_str())
97    }
98}
99
100// Configuration constants
101pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110    "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125    "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127    "compaction.write_parquet_max_row_group_bytes";
128pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
129pub const ORDER_KEY: &str = "order_key";
130pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
131pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
132pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
133
134pub(super) const PARQUET_CREATED_BY: &str =
135    concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
136
137fn default_commit_retry_num() -> u32 {
138    8
139}
140
141fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
142    Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
143}
144
145fn default_iceberg_write_mode() -> IcebergWriteMode {
146    IcebergWriteMode::MergeOnRead
147}
148
149fn default_iceberg_format_version() -> FormatVersion {
150    FormatVersion::V2
151}
152
153fn default_true() -> bool {
154    true
155}
156
157fn default_some_true() -> Option<bool> {
158    Some(true)
159}
160
161fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
162    let parsed = value
163        .trim()
164        .parse::<u8>()
165        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
166    match parsed {
167        1 => Ok(FormatVersion::V1),
168        2 => Ok(FormatVersion::V2),
169        3 => Ok(FormatVersion::V3),
170        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
171    }
172}
173
174fn deserialize_format_version<'de, D>(
175    deserializer: D,
176) -> std::result::Result<FormatVersion, D::Error>
177where
178    D: Deserializer<'de>,
179{
180    struct FormatVersionVisitor;
181
182    impl<'de> Visitor<'de> for FormatVersionVisitor {
183        type Value = FormatVersion;
184
185        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186            formatter.write_str("format-version as 1, 2, or 3")
187        }
188
189        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
190        where
191            E: de::Error,
192        {
193            let value = u8::try_from(value)
194                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
195            parse_format_version_str(&value.to_string()).map_err(E::custom)
196        }
197
198        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
199        where
200            E: de::Error,
201        {
202            let value = u8::try_from(value)
203                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
204            parse_format_version_str(&value.to_string()).map_err(E::custom)
205        }
206
207        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
208        where
209            E: de::Error,
210        {
211            parse_format_version_str(value).map_err(E::custom)
212        }
213
214        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
215        where
216            E: de::Error,
217        {
218            self.visit_str(&value)
219        }
220    }
221
222    deserializer.deserialize_any(FormatVersionVisitor)
223}
224
225/// Compaction type for Iceberg sink
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
227#[serde(rename_all = "kebab-case")]
228pub enum CompactionType {
229    /// Full compaction - rewrites all data files
230    #[default]
231    Full,
232    /// Small files compaction - only compact small files
233    SmallFiles,
234    /// Files with delete compaction - only compact files that have associated delete files
235    FilesWithDelete,
236}
237
238impl CompactionType {
239    pub fn as_str(&self) -> &'static str {
240        match self {
241            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
242            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
243            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
244        }
245    }
246}
247
248impl std::str::FromStr for CompactionType {
249    type Err = SinkError;
250
251    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
252        match s {
253            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
254            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
255            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
256            _ => Err(SinkError::Config(anyhow!(format!(
257                "invalid compaction_type: {}, must be one of: {}, {}, {}",
258                s,
259                ICEBERG_COMPACTION_TYPE_FULL,
260                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
261                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
262            )))),
263        }
264    }
265}
266
267impl TryFrom<&str> for CompactionType {
268    type Error = <Self as std::str::FromStr>::Err;
269
270    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
271        value.parse()
272    }
273}
274
275impl TryFrom<String> for CompactionType {
276    type Error = <Self as std::str::FromStr>::Err;
277
278    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
279        value.as_str().parse()
280    }
281}
282
283impl std::fmt::Display for CompactionType {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        write!(f, "{}", self.as_str())
286    }
287}
288
289#[serde_as]
290#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
291pub struct IcebergConfig {
292    pub r#type: String, // accept "append-only" or "upsert"
293
294    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
295    pub force_append_only: bool,
296
297    #[serde(flatten)]
298    pub(crate) common: IcebergCommon,
299
300    #[serde(flatten)]
301    pub(crate) table: IcebergTableIdentifier,
302
303    #[serde(
304        rename = "primary_key",
305        default,
306        deserialize_with = "deserialize_optional_string_seq_from_string"
307    )]
308    pub primary_key: Option<Vec<String>>,
309
310    // Props for java catalog props.
311    #[serde(skip)]
312    pub java_catalog_props: HashMap<String, String>,
313
314    #[serde(default)]
315    pub partition_by: Option<String>,
316
317    #[serde(default)]
318    pub order_key: Option<String>,
319
320    /// Commit every n(>0) checkpoints, default is 60.
321    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
322    #[serde_as(as = "DisplayFromStr")]
323    #[with_option(allow_alter_on_fly)]
324    pub commit_checkpoint_interval: u64,
325
326    /// Commit on the next checkpoint barrier after buffered write size exceeds this threshold.
327    /// Default is 128 MB.
328    #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
329    #[serde_as(as = "Option<DisplayFromStr>")]
330    #[with_option(allow_alter_on_fly)]
331    pub commit_checkpoint_size_threshold_mb: Option<u64>,
332
333    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
334    pub create_table_if_not_exists: bool,
335
336    /// Whether it is `exactly_once`, the default is true.
337    #[serde(default = "default_some_true")]
338    #[serde_as(as = "Option<DisplayFromStr>")]
339    pub is_exactly_once: Option<bool>,
340    // Retry commit num when iceberg commit fail. default is 8.
341    // # TODO
342    // Iceberg table may store the retry commit num in table meta.
343    // We should try to find and use that as default commit retry num first.
344    #[serde(default = "default_commit_retry_num")]
345    pub commit_retry_num: u32,
346
347    /// Whether to enable iceberg compaction.
348    #[serde(
349        rename = "enable_compaction",
350        default,
351        deserialize_with = "deserialize_bool_from_string"
352    )]
353    #[with_option(allow_alter_on_fly)]
354    pub enable_compaction: bool,
355
356    /// The interval of iceberg compaction
357    #[serde(rename = "compaction_interval_sec", default)]
358    #[serde_as(as = "Option<DisplayFromStr>")]
359    #[with_option(allow_alter_on_fly)]
360    pub compaction_interval_sec: Option<u64>,
361
362    /// Whether to enable iceberg expired snapshots.
363    #[serde(
364        rename = "enable_snapshot_expiration",
365        default,
366        deserialize_with = "deserialize_bool_from_string"
367    )]
368    #[with_option(allow_alter_on_fly)]
369    pub enable_snapshot_expiration: bool,
370
371    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
372    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
373    pub write_mode: IcebergWriteMode,
374
375    /// Iceberg format version for table creation.
376    #[serde(
377        rename = "format_version",
378        default = "default_iceberg_format_version",
379        deserialize_with = "deserialize_format_version"
380    )]
381    pub format_version: FormatVersion,
382
383    /// The maximum age (in milliseconds) for snapshots before they expire
384    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
385    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
386    #[serde_as(as = "Option<DisplayFromStr>")]
387    #[with_option(allow_alter_on_fly)]
388    pub snapshot_expiration_max_age_millis: Option<i64>,
389
390    /// The number of snapshots to retain
391    #[serde(rename = "snapshot_expiration_retain_last", default)]
392    #[serde_as(as = "Option<DisplayFromStr>")]
393    #[with_option(allow_alter_on_fly)]
394    pub snapshot_expiration_retain_last: Option<i32>,
395
396    #[serde(
397        rename = "snapshot_expiration_clear_expired_files",
398        default = "default_true",
399        deserialize_with = "deserialize_bool_from_string"
400    )]
401    #[with_option(allow_alter_on_fly)]
402    pub snapshot_expiration_clear_expired_files: bool,
403
404    #[serde(
405        rename = "snapshot_expiration_clear_expired_meta_data",
406        default = "default_true",
407        deserialize_with = "deserialize_bool_from_string"
408    )]
409    #[with_option(allow_alter_on_fly)]
410    pub snapshot_expiration_clear_expired_meta_data: bool,
411
412    /// The maximum number of snapshots allowed since the last rewrite operation
413    /// If set, sink will check snapshot count and wait if exceeded
414    #[serde(rename = "compaction.max_snapshots_num", default)]
415    #[serde_as(as = "Option<DisplayFromStr>")]
416    #[with_option(allow_alter_on_fly)]
417    pub max_snapshots_num_before_compaction: Option<usize>,
418
419    #[serde(rename = "compaction.small_files_threshold_mb", default)]
420    #[serde_as(as = "Option<DisplayFromStr>")]
421    #[with_option(allow_alter_on_fly)]
422    pub small_files_threshold_mb: Option<u64>,
423
424    #[serde(rename = "compaction.delete_files_count_threshold", default)]
425    #[serde_as(as = "Option<DisplayFromStr>")]
426    #[with_option(allow_alter_on_fly)]
427    pub delete_files_count_threshold: Option<usize>,
428
429    #[serde(rename = "compaction.trigger_snapshot_count", default)]
430    #[serde_as(as = "Option<DisplayFromStr>")]
431    #[with_option(allow_alter_on_fly)]
432    pub trigger_snapshot_count: Option<usize>,
433
434    #[serde(rename = "compaction.target_file_size_mb", default)]
435    #[serde_as(as = "Option<DisplayFromStr>")]
436    #[with_option(allow_alter_on_fly)]
437    pub target_file_size_mb: Option<u64>,
438
439    /// Compaction type: `full`, `small-files`, or `files-with-delete`
440    /// If not set, will default to `full`
441    #[serde(rename = "compaction.type", default)]
442    #[with_option(allow_alter_on_fly)]
443    pub compaction_type: Option<CompactionType>,
444
445    /// Parquet compression codec
446    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
447    /// Default is zstd
448    #[serde(rename = "compaction.write_parquet_compression", default)]
449    #[with_option(allow_alter_on_fly)]
450    pub write_parquet_compression: Option<String>,
451
452    /// Deprecated: maximum number of rows in a Parquet row group.
453    /// Accepted for backward compatibility, but ignored by the writer.
454    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
455    #[serde_as(as = "Option<DisplayFromStr>")]
456    #[with_option(allow_alter_on_fly)]
457    pub write_parquet_max_row_group_rows: Option<usize>,
458
459    /// Maximum size of a Parquet row group in bytes
460    /// Default is 128 `MiB`, matching Iceberg defaults.
461    #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
462    #[serde_as(as = "Option<DisplayFromStr>")]
463    #[with_option(allow_alter_on_fly)]
464    pub write_parquet_max_row_group_bytes: Option<usize>,
465
466    /// Whether to enable PK index for upsert sink. Default is false.
467    /// It's used for V3 upsert iceberg sink to generate delete vectors.
468    #[serde(
469        rename = "enable_pk_index",
470        default,
471        deserialize_with = "deserialize_bool_from_string"
472    )]
473    pub enable_pk_index: bool,
474}
475
476impl EnforceSecret for IcebergConfig {
477    fn enforce_secret<'a>(
478        prop_iter: impl Iterator<Item = &'a str>,
479    ) -> crate::error::ConnectorResult<()> {
480        for prop in prop_iter {
481            IcebergCommon::enforce_one(prop)?;
482        }
483        Ok(())
484    }
485
486    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
487        IcebergCommon::enforce_one(prop)
488    }
489}
490
491impl IcebergConfig {
492    /// Validate that append-only sinks use merge-on-read mode
493    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
494    pub fn validate_append_only_write_mode(
495        sink_type: &str,
496        write_mode: IcebergWriteMode,
497    ) -> Result<()> {
498        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
499            return Err(SinkError::Config(anyhow!(
500                "'copy-on-write' mode is not supported for append-only iceberg sink. \
501                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
502            )));
503        }
504        Ok(())
505    }
506
507    pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
508        if !self.enable_pk_index {
509            return Ok(());
510        }
511
512        if self.r#type != SINK_TYPE_UPSERT {
513            return Err(SinkError::Config(anyhow!(
514                "`enable_pk_index` is only supported for upsert iceberg sink"
515            )));
516        }
517
518        if self.write_mode != IcebergWriteMode::MergeOnRead {
519            return Err(SinkError::Config(anyhow!(
520                "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
521            )));
522        }
523
524        if self.format_version < FormatVersion::V3 {
525            return Err(SinkError::Config(anyhow!(
526                "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
527            )));
528        }
529
530        if self.force_append_only {
531            return Err(SinkError::Config(anyhow!(
532                "`enable_pk_index` cannot be true when `force_append_only` is true"
533            )));
534        }
535
536        Ok(())
537    }
538
539    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
540        let mut config =
541            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
542                .map_err(|e| SinkError::Config(anyhow!(e)))?;
543
544        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
545            return Err(SinkError::Config(anyhow!(
546                "`{}` must be {}, or {}",
547                SINK_TYPE_OPTION,
548                SINK_TYPE_APPEND_ONLY,
549                SINK_TYPE_UPSERT
550            )));
551        }
552
553        if config.r#type == SINK_TYPE_UPSERT {
554            if let Some(primary_key) = &config.primary_key {
555                if primary_key.is_empty() {
556                    return Err(SinkError::Config(anyhow!(
557                        "`primary-key` must not be empty in {}",
558                        SINK_TYPE_UPSERT
559                    )));
560                }
561            } else {
562                return Err(SinkError::Config(anyhow!(
563                    "Must set `primary-key` in {}",
564                    SINK_TYPE_UPSERT
565                )));
566            }
567        }
568
569        // Enforce merge-on-read for append-only sinks
570        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
571        config.validate_enable_pk_index()?;
572
573        // All configs start with "catalog." will be treated as java configs.
574        config.java_catalog_props = values
575            .iter()
576            .filter(|(k, _v)| {
577                k.starts_with("catalog.")
578                    && k != &"catalog.uri"
579                    && k != &"catalog.type"
580                    && k != &"catalog.name"
581                    && k != &"catalog.header"
582            })
583            .map(|(k, v)| (k[8..].to_string(), v.clone()))
584            .collect();
585
586        if config.commit_checkpoint_interval == 0 {
587            return Err(SinkError::Config(anyhow!(
588                "`commit-checkpoint-interval` must be greater than 0"
589            )));
590        }
591
592        if config.commit_checkpoint_size_threshold_mb == Some(0) {
593            return Err(SinkError::Config(anyhow!(
594                "`commit_checkpoint_size_threshold_mb` must be greater than 0"
595            )));
596        }
597
598        if config.trigger_snapshot_count == Some(0) {
599            return Err(SinkError::Config(anyhow!(
600                "`compaction.trigger_snapshot_count` must be greater than 0"
601            )));
602        }
603
604        // Validate table identifier (e.g., database.name should not contain dots)
605        config
606            .table
607            .validate()
608            .map_err(|e| SinkError::Config(anyhow!(e)))?;
609
610        if config.write_parquet_max_row_group_rows.is_some() {
611            tracing::warn!(
612                "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
613            );
614        }
615
616        Ok(config)
617    }
618
619    pub fn catalog_type(&self) -> &str {
620        self.common.catalog_type()
621    }
622
623    pub async fn load_table(&self) -> Result<Table> {
624        self.common
625            .load_table(&self.table, &self.java_catalog_props)
626            .await
627            .map_err(Into::into)
628    }
629
630    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
631        self.common
632            .create_catalog(&self.java_catalog_props)
633            .await
634            .map_err(Into::into)
635    }
636
637    pub fn full_table_name(&self) -> Result<TableIdent> {
638        self.table.to_table_ident().map_err(Into::into)
639    }
640
641    pub fn catalog_name(&self) -> String {
642        self.common.catalog_name()
643    }
644
645    pub fn table_format_version(&self) -> FormatVersion {
646        self.format_version
647    }
648
649    pub fn compaction_interval_sec(&self) -> u64 {
650        // default to 1 hour
651        self.compaction_interval_sec.unwrap_or(3600)
652    }
653
654    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
655    /// Returns `current_time_ms` - `max_age_millis`
656    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
657        self.snapshot_expiration_max_age_millis
658            .map(|max_age_millis| current_time_ms - max_age_millis)
659    }
660
661    pub fn trigger_snapshot_count(&self) -> usize {
662        self.trigger_snapshot_count.unwrap_or(usize::MAX)
663    }
664
665    pub fn small_files_threshold_mb(&self) -> u64 {
666        self.small_files_threshold_mb.unwrap_or(64)
667    }
668
669    pub fn delete_files_count_threshold(&self) -> usize {
670        self.delete_files_count_threshold.unwrap_or(256)
671    }
672
673    pub fn target_file_size_mb(&self) -> u64 {
674        self.target_file_size_mb.unwrap_or(1024)
675    }
676
677    pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
678        self.commit_checkpoint_size_threshold_mb
679            .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
680    }
681
682    /// Get the compaction type as an enum
683    /// This method parses the string and returns the enum value
684    pub fn compaction_type(&self) -> CompactionType {
685        self.compaction_type.unwrap_or_default()
686    }
687
688    /// Get the parquet compression codec
689    /// Default is "zstd"
690    pub fn write_parquet_compression(&self) -> &str {
691        self.write_parquet_compression.as_deref().unwrap_or("zstd")
692    }
693
694    /// Get the maximum number of rows in a Parquet row group.
695    pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
696        self.write_parquet_max_row_group_rows
697    }
698
699    /// Get the maximum size in bytes of a Parquet row group.
700    pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
701        self.write_parquet_max_row_group_bytes
702            .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
703    }
704
705    /// Parse the compression codec string into Parquet Compression enum.
706    /// Invalid values fall back to SNAPPY.
707    pub fn get_parquet_compression(&self) -> Compression {
708        parse_parquet_compression(self.write_parquet_compression())
709    }
710}
711
712/// Parse compression codec string to Parquet Compression enum
713pub fn parse_parquet_compression(codec: &str) -> Compression {
714    match codec.to_lowercase().as_str() {
715        "uncompressed" => Compression::UNCOMPRESSED,
716        "snappy" => Compression::SNAPPY,
717        "gzip" => Compression::GZIP(Default::default()),
718        "lzo" => Compression::LZO,
719        "brotli" => Compression::BROTLI(Default::default()),
720        "lz4" => Compression::LZ4,
721        "zstd" => Compression::ZSTD(Default::default()),
722        _ => {
723            tracing::warn!(
724                "Unknown compression codec '{}', falling back to SNAPPY",
725                codec
726            );
727            Compression::SNAPPY
728        }
729    }
730}
731
732// Helper Functions
733
734pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
735    if should_enable_iceberg_cow(sink_type, write_mode) {
736        ICEBERG_COW_BRANCH.to_owned()
737    } else {
738        MAIN_BRANCH.to_owned()
739    }
740}
741
742pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
743    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
744}
745
746impl crate::with_options::WithOptions for IcebergWriteMode {}
747
748impl crate::with_options::WithOptions for FormatVersion {}
749
750impl crate::with_options::WithOptions for CompactionType {}