Skip to main content

risingwave_connector/sink/iceberg/
config.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49    #[default]
50    MergeOnRead,
51    CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55    pub fn as_str(self) -> &'static str {
56        match self {
57            IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58            IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59        }
60    }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64    type Err = SinkError;
65
66    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67        match s {
68            ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69            ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70            _ => Err(SinkError::Config(anyhow!(format!(
71                "invalid write_mode: {}, must be one of: {}, {}",
72                s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73            )))),
74        }
75    }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79    type Error = <Self as std::str::FromStr>::Err;
80
81    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82        value.parse()
83    }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87    type Error = <Self as std::str::FromStr>::Err;
88
89    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90        value.as_str().parse()
91    }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.write_str(self.as_str())
97    }
98}
99
100// Configuration constants
101pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110    "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125    "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127    "compaction.write_parquet_max_row_group_bytes";
128pub const ORDER_KEY: &str = "order_key";
129pub const DEFAULT_COMPACTION_MAX_SNAPSHOTS_NUM: usize = 1000;
130pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
131pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
132
133pub(super) const PARQUET_CREATED_BY: &str =
134    concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
135
136fn default_commit_retry_num() -> u32 {
137    8
138}
139
140fn default_iceberg_write_mode() -> IcebergWriteMode {
141    IcebergWriteMode::MergeOnRead
142}
143
144fn default_iceberg_format_version() -> FormatVersion {
145    FormatVersion::V2
146}
147
148fn default_true() -> bool {
149    true
150}
151
152fn default_some_true() -> Option<bool> {
153    Some(true)
154}
155
156fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
157    let parsed = value
158        .trim()
159        .parse::<u8>()
160        .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
161    match parsed {
162        1 => Ok(FormatVersion::V1),
163        2 => Ok(FormatVersion::V2),
164        3 => Ok(FormatVersion::V3),
165        _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
166    }
167}
168
169fn deserialize_format_version<'de, D>(
170    deserializer: D,
171) -> std::result::Result<FormatVersion, D::Error>
172where
173    D: Deserializer<'de>,
174{
175    struct FormatVersionVisitor;
176
177    impl<'de> Visitor<'de> for FormatVersionVisitor {
178        type Value = FormatVersion;
179
180        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181            formatter.write_str("format-version as 1, 2, or 3")
182        }
183
184        fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
185        where
186            E: de::Error,
187        {
188            let value = u8::try_from(value)
189                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
190            parse_format_version_str(&value.to_string()).map_err(E::custom)
191        }
192
193        fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
194        where
195            E: de::Error,
196        {
197            let value = u8::try_from(value)
198                .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
199            parse_format_version_str(&value.to_string()).map_err(E::custom)
200        }
201
202        fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
203        where
204            E: de::Error,
205        {
206            parse_format_version_str(value).map_err(E::custom)
207        }
208
209        fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
210        where
211            E: de::Error,
212        {
213            self.visit_str(&value)
214        }
215    }
216
217    deserializer.deserialize_any(FormatVersionVisitor)
218}
219
220/// Compaction type for Iceberg sink
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
222#[serde(rename_all = "kebab-case")]
223pub enum CompactionType {
224    /// Full compaction - rewrites all data files
225    #[default]
226    Full,
227    /// Small files compaction - only compact small files
228    SmallFiles,
229    /// Files with delete compaction - only compact files that have associated delete files
230    FilesWithDelete,
231}
232
233impl CompactionType {
234    pub fn as_str(&self) -> &'static str {
235        match self {
236            CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
237            CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238            CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
239        }
240    }
241}
242
243impl std::str::FromStr for CompactionType {
244    type Err = SinkError;
245
246    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
247        match s {
248            ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
249            ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
250            ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
251            _ => Err(SinkError::Config(anyhow!(format!(
252                "invalid compaction_type: {}, must be one of: {}, {}, {}",
253                s,
254                ICEBERG_COMPACTION_TYPE_FULL,
255                ICEBERG_COMPACTION_TYPE_SMALL_FILES,
256                ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
257            )))),
258        }
259    }
260}
261
262impl TryFrom<&str> for CompactionType {
263    type Error = <Self as std::str::FromStr>::Err;
264
265    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
266        value.parse()
267    }
268}
269
270impl TryFrom<String> for CompactionType {
271    type Error = <Self as std::str::FromStr>::Err;
272
273    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
274        value.as_str().parse()
275    }
276}
277
278impl std::fmt::Display for CompactionType {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "{}", self.as_str())
281    }
282}
283
284#[serde_as]
285#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
286pub struct IcebergConfig {
287    pub r#type: String, // accept "append-only" or "upsert"
288
289    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
290    pub force_append_only: bool,
291
292    #[serde(flatten)]
293    pub(crate) common: IcebergCommon,
294
295    #[serde(flatten)]
296    pub(crate) table: IcebergTableIdentifier,
297
298    #[serde(
299        rename = "primary_key",
300        default,
301        deserialize_with = "deserialize_optional_string_seq_from_string"
302    )]
303    pub primary_key: Option<Vec<String>>,
304
305    // Props for java catalog props.
306    #[serde(skip)]
307    pub java_catalog_props: HashMap<String, String>,
308
309    #[serde(default)]
310    pub partition_by: Option<String>,
311
312    #[serde(default)]
313    pub order_key: Option<String>,
314
315    /// Commit every n(>0) checkpoints, default is 60.
316    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
317    #[serde_as(as = "DisplayFromStr")]
318    #[with_option(allow_alter_on_fly)]
319    pub commit_checkpoint_interval: u64,
320
321    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
322    pub create_table_if_not_exists: bool,
323
324    /// Whether it is `exactly_once`, the default is true.
325    #[serde(default = "default_some_true")]
326    #[serde_as(as = "Option<DisplayFromStr>")]
327    pub is_exactly_once: Option<bool>,
328    // Retry commit num when iceberg commit fail. default is 8.
329    // # TODO
330    // Iceberg table may store the retry commit num in table meta.
331    // We should try to find and use that as default commit retry num first.
332    #[serde(default = "default_commit_retry_num")]
333    pub commit_retry_num: u32,
334
335    /// Whether to enable iceberg compaction.
336    #[serde(
337        rename = "enable_compaction",
338        default,
339        deserialize_with = "deserialize_bool_from_string"
340    )]
341    #[with_option(allow_alter_on_fly)]
342    pub enable_compaction: bool,
343
344    /// The interval of iceberg compaction
345    #[serde(rename = "compaction_interval_sec", default)]
346    #[serde_as(as = "Option<DisplayFromStr>")]
347    #[with_option(allow_alter_on_fly)]
348    pub compaction_interval_sec: Option<u64>,
349
350    /// Whether to enable iceberg expired snapshots.
351    #[serde(
352        rename = "enable_snapshot_expiration",
353        default = "default_true",
354        deserialize_with = "deserialize_bool_from_string"
355    )]
356    #[with_option(allow_alter_on_fly)]
357    pub enable_snapshot_expiration: bool,
358
359    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
360    #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
361    pub write_mode: IcebergWriteMode,
362
363    /// Iceberg format version for table creation.
364    #[serde(
365        rename = "format_version",
366        default = "default_iceberg_format_version",
367        deserialize_with = "deserialize_format_version"
368    )]
369    pub format_version: FormatVersion,
370
371    /// The maximum age (in milliseconds) for snapshots before they expire
372    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
373    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
374    #[serde_as(as = "Option<DisplayFromStr>")]
375    #[with_option(allow_alter_on_fly)]
376    pub snapshot_expiration_max_age_millis: Option<i64>,
377
378    /// The number of snapshots to retain
379    #[serde(rename = "snapshot_expiration_retain_last", default)]
380    #[serde_as(as = "Option<DisplayFromStr>")]
381    #[with_option(allow_alter_on_fly)]
382    pub snapshot_expiration_retain_last: Option<i32>,
383
384    #[serde(
385        rename = "snapshot_expiration_clear_expired_files",
386        default = "default_true",
387        deserialize_with = "deserialize_bool_from_string"
388    )]
389    #[with_option(allow_alter_on_fly)]
390    pub snapshot_expiration_clear_expired_files: bool,
391
392    #[serde(
393        rename = "snapshot_expiration_clear_expired_meta_data",
394        default = "default_true",
395        deserialize_with = "deserialize_bool_from_string"
396    )]
397    #[with_option(allow_alter_on_fly)]
398    pub snapshot_expiration_clear_expired_meta_data: bool,
399
400    /// The maximum number of snapshots allowed since the last rewrite operation
401    /// If set, sink will check snapshot count and wait if exceeded
402    /// If unset, defaults to 1000 only when compaction is enabled
403    #[serde(rename = "compaction.max_snapshots_num", default)]
404    #[serde_as(as = "Option<DisplayFromStr>")]
405    #[with_option(allow_alter_on_fly)]
406    pub max_snapshots_num_before_compaction: Option<usize>,
407
408    #[serde(rename = "compaction.small_files_threshold_mb", default)]
409    #[serde_as(as = "Option<DisplayFromStr>")]
410    #[with_option(allow_alter_on_fly)]
411    pub small_files_threshold_mb: Option<u64>,
412
413    #[serde(rename = "compaction.delete_files_count_threshold", default)]
414    #[serde_as(as = "Option<DisplayFromStr>")]
415    #[with_option(allow_alter_on_fly)]
416    pub delete_files_count_threshold: Option<usize>,
417
418    #[serde(rename = "compaction.trigger_snapshot_count", default)]
419    #[serde_as(as = "Option<DisplayFromStr>")]
420    #[with_option(allow_alter_on_fly)]
421    pub trigger_snapshot_count: Option<usize>,
422
423    #[serde(rename = "compaction.target_file_size_mb", default)]
424    #[serde_as(as = "Option<DisplayFromStr>")]
425    #[with_option(allow_alter_on_fly)]
426    pub target_file_size_mb: Option<u64>,
427
428    /// Compaction type: `full`, `small-files`, or `files-with-delete`
429    /// If not set, will default to `full`
430    #[serde(rename = "compaction.type", default)]
431    #[with_option(allow_alter_on_fly)]
432    pub compaction_type: Option<CompactionType>,
433
434    /// Parquet compression codec
435    /// Supported values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
436    /// Default is zstd
437    #[serde(rename = "compaction.write_parquet_compression", default)]
438    #[with_option(allow_alter_on_fly)]
439    pub write_parquet_compression: Option<String>,
440
441    /// Deprecated: maximum number of rows in a Parquet row group.
442    /// Accepted for backward compatibility, but ignored by the writer.
443    #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
444    #[serde_as(as = "Option<DisplayFromStr>")]
445    #[with_option(allow_alter_on_fly)]
446    pub write_parquet_max_row_group_rows: Option<usize>,
447
448    /// Maximum size of a Parquet row group in bytes
449    /// Default is 128 `MiB`, matching Iceberg defaults.
450    #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
451    #[serde_as(as = "Option<DisplayFromStr>")]
452    #[with_option(allow_alter_on_fly)]
453    pub write_parquet_max_row_group_bytes: Option<usize>,
454
455    /// Whether to enable PK index for upsert sink. Default is false.
456    /// It's used for V3 upsert iceberg sink to generate delete vectors.
457    #[serde(
458        rename = "enable_pk_index",
459        default,
460        deserialize_with = "deserialize_bool_from_string"
461    )]
462    pub enable_pk_index: bool,
463}
464
465impl EnforceSecret for IcebergConfig {
466    fn enforce_secret<'a>(
467        prop_iter: impl Iterator<Item = &'a str>,
468    ) -> crate::error::ConnectorResult<()> {
469        for prop in prop_iter {
470            IcebergCommon::enforce_one(prop)?;
471        }
472        Ok(())
473    }
474
475    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
476        IcebergCommon::enforce_one(prop)
477    }
478}
479
480impl IcebergConfig {
481    /// Validate that append-only sinks use merge-on-read mode
482    /// Copy-on-write is strictly worse than merge-on-read for append-only workloads
483    pub fn validate_append_only_write_mode(
484        sink_type: &str,
485        write_mode: IcebergWriteMode,
486    ) -> Result<()> {
487        if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
488            return Err(SinkError::Config(anyhow!(
489                "'copy-on-write' mode is not supported for append-only iceberg sink. \
490                 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
491            )));
492        }
493        Ok(())
494    }
495
496    pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
497        if !self.enable_pk_index {
498            return Ok(());
499        }
500
501        if self.r#type != SINK_TYPE_UPSERT {
502            return Err(SinkError::Config(anyhow!(
503                "`enable_pk_index` is only supported for upsert iceberg sink"
504            )));
505        }
506
507        if self.write_mode != IcebergWriteMode::MergeOnRead {
508            return Err(SinkError::Config(anyhow!(
509                "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
510            )));
511        }
512
513        if self.format_version < FormatVersion::V3 {
514            return Err(SinkError::Config(anyhow!(
515                "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
516            )));
517        }
518
519        if self.force_append_only {
520            return Err(SinkError::Config(anyhow!(
521                "`enable_pk_index` cannot be true when `force_append_only` is true"
522            )));
523        }
524
525        Ok(())
526    }
527
528    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
529        let mut config =
530            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
531                .map_err(|e| SinkError::Config(anyhow!(e)))?;
532
533        if config.enable_compaction && !values.contains_key(COMPACTION_MAX_SNAPSHOTS_NUM) {
534            config.max_snapshots_num_before_compaction = Some(DEFAULT_COMPACTION_MAX_SNAPSHOTS_NUM);
535        }
536
537        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
538            return Err(SinkError::Config(anyhow!(
539                "`{}` must be {}, or {}",
540                SINK_TYPE_OPTION,
541                SINK_TYPE_APPEND_ONLY,
542                SINK_TYPE_UPSERT
543            )));
544        }
545
546        if config.r#type == SINK_TYPE_UPSERT {
547            if let Some(primary_key) = &config.primary_key {
548                if primary_key.is_empty() {
549                    return Err(SinkError::Config(anyhow!(
550                        "`primary-key` must not be empty in {}",
551                        SINK_TYPE_UPSERT
552                    )));
553                }
554            } else {
555                return Err(SinkError::Config(anyhow!(
556                    "Must set `primary-key` in {}",
557                    SINK_TYPE_UPSERT
558                )));
559            }
560        }
561
562        // Enforce merge-on-read for append-only sinks
563        Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
564        config.validate_enable_pk_index()?;
565
566        // All configs start with "catalog." will be treated as java configs.
567        config.java_catalog_props = values
568            .iter()
569            .filter(|(k, _v)| {
570                k.starts_with("catalog.")
571                    && k != &"catalog.uri"
572                    && k != &"catalog.type"
573                    && k != &"catalog.name"
574                    && k != &"catalog.header"
575            })
576            .map(|(k, v)| (k[8..].to_string(), v.clone()))
577            .collect();
578
579        if config.commit_checkpoint_interval == 0 {
580            return Err(SinkError::Config(anyhow!(
581                "`commit-checkpoint-interval` must be greater than 0"
582            )));
583        }
584
585        if config.trigger_snapshot_count == Some(0) {
586            return Err(SinkError::Config(anyhow!(
587                "`compaction.trigger_snapshot_count` must be greater than 0"
588            )));
589        }
590
591        if config.max_snapshots_num_before_compaction == Some(0) {
592            return Err(SinkError::Config(anyhow!(
593                "`compaction.max_snapshots_num` must be greater than 0"
594            )));
595        }
596
597        // Validate table identifier (e.g., database.name should not contain dots)
598        config
599            .table
600            .validate()
601            .map_err(|e| SinkError::Config(anyhow!(e)))?;
602
603        if config.write_parquet_max_row_group_rows.is_some() {
604            tracing::warn!(
605                "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
606            );
607        }
608
609        Ok(config)
610    }
611
612    pub fn catalog_type(&self) -> &str {
613        self.common.catalog_type()
614    }
615
616    pub async fn load_table(&self) -> Result<Table> {
617        #[cfg(any(test, madsim))]
618        if self.catalog_type() == "mock_v3" {
619            let catalog =
620                crate::sink::iceberg::mock_v3_catalog_registry::get().ok_or_else(|| {
621                    SinkError::Config(anyhow!(
622                        "mock_v3 catalog_type set but no mock catalog registered"
623                    ))
624                })?;
625            let table_id = self
626                .table
627                .to_table_ident()
628                .map_err(|e| SinkError::Config(anyhow!(e).context("Unable to parse table name")))?;
629            let table = catalog
630                .load_table(&table_id)
631                .await
632                .map_err(|e| SinkError::Config(anyhow!(e).context("Failed to load mock table")))?;
633            return Ok(table);
634        }
635        self.common
636            .load_table(&self.table, &self.java_catalog_props)
637            .await
638            .map_err(Into::into)
639    }
640
641    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
642        #[cfg(any(test, madsim))]
643        if self.catalog_type() == "mock_v3" {
644            return Ok(
645                crate::sink::iceberg::mock_v3_catalog_registry::get().ok_or_else(|| {
646                    anyhow::anyhow!("mock_v3 catalog_type set but no mock catalog registered")
647                })?,
648            );
649        }
650        self.common
651            .create_catalog(&self.java_catalog_props)
652            .await
653            .map_err(Into::into)
654    }
655
656    pub fn full_table_name(&self) -> Result<TableIdent> {
657        self.table.to_table_ident().map_err(Into::into)
658    }
659
660    pub fn catalog_name(&self) -> String {
661        self.common.catalog_name()
662    }
663
664    pub fn table_format_version(&self) -> FormatVersion {
665        self.format_version
666    }
667
668    pub fn compaction_interval_sec(&self) -> u64 {
669        // default to 1 hour
670        self.compaction_interval_sec.unwrap_or(3600)
671    }
672
673    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
674    /// Returns `current_time_ms` - `max_age_millis`
675    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
676        self.snapshot_expiration_max_age_millis
677            .map(|max_age_millis| current_time_ms - max_age_millis)
678    }
679
680    pub fn trigger_snapshot_count(&self) -> usize {
681        self.trigger_snapshot_count.unwrap_or(usize::MAX)
682    }
683
684    pub fn small_files_threshold_mb(&self) -> u64 {
685        self.small_files_threshold_mb.unwrap_or(64)
686    }
687
688    pub fn delete_files_count_threshold(&self) -> usize {
689        self.delete_files_count_threshold.unwrap_or(256)
690    }
691
692    pub fn target_file_size_mb(&self) -> u64 {
693        self.target_file_size_mb.unwrap_or(1024)
694    }
695
696    /// Get the compaction type as an enum
697    /// This method parses the string and returns the enum value
698    pub fn compaction_type(&self) -> CompactionType {
699        self.compaction_type.unwrap_or_default()
700    }
701
702    /// Get the parquet compression codec
703    /// Default is "zstd"
704    pub fn write_parquet_compression(&self) -> &str {
705        self.write_parquet_compression.as_deref().unwrap_or("zstd")
706    }
707
708    /// Get the maximum number of rows in a Parquet row group.
709    pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
710        self.write_parquet_max_row_group_rows
711    }
712
713    /// Get the maximum size in bytes of a Parquet row group.
714    pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
715        self.write_parquet_max_row_group_bytes
716            .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
717    }
718
719    /// Parse the compression codec string into Parquet Compression enum.
720    /// Invalid values fall back to SNAPPY.
721    pub fn get_parquet_compression(&self) -> Compression {
722        parse_parquet_compression(self.write_parquet_compression())
723    }
724}
725
726/// Parse compression codec string to Parquet Compression enum
727pub fn parse_parquet_compression(codec: &str) -> Compression {
728    match codec.to_lowercase().as_str() {
729        "uncompressed" => Compression::UNCOMPRESSED,
730        "snappy" => Compression::SNAPPY,
731        "gzip" => Compression::GZIP(Default::default()),
732        "lzo" => Compression::LZO,
733        "brotli" => Compression::BROTLI(Default::default()),
734        "lz4" => Compression::LZ4,
735        "zstd" => Compression::ZSTD(Default::default()),
736        _ => {
737            tracing::warn!(
738                "Unknown compression codec '{}', falling back to SNAPPY",
739                codec
740            );
741            Compression::SNAPPY
742        }
743    }
744}
745
746// Helper Functions
747
748pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
749    if should_enable_iceberg_cow(sink_type, write_mode) {
750        ICEBERG_COW_BRANCH.to_owned()
751    } else {
752        MAIN_BRANCH.to_owned()
753    }
754}
755
756pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
757    sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
758}
759
760impl crate::with_options::WithOptions for IcebergWriteMode {}
761
762impl crate::with_options::WithOptions for FormatVersion {}
763
764impl crate::with_options::WithOptions for CompactionType {}