1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49 #[default]
50 MergeOnRead,
51 CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55 pub fn as_str(self) -> &'static str {
56 match self {
57 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59 }
60 }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64 type Err = SinkError;
65
66 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67 match s {
68 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70 _ => Err(SinkError::Config(anyhow!(format!(
71 "invalid write_mode: {}, must be one of: {}, {}",
72 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73 )))),
74 }
75 }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79 type Error = <Self as std::str::FromStr>::Err;
80
81 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82 value.parse()
83 }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87 type Error = <Self as std::str::FromStr>::Err;
88
89 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90 value.as_str().parse()
91 }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110 "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125 "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127 "compaction.write_parquet_max_row_group_bytes";
128pub const ORDER_KEY: &str = "order_key";
129pub const DEFAULT_COMPACTION_MAX_SNAPSHOTS_NUM: usize = 1000;
130pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
131pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
132
133pub(super) const PARQUET_CREATED_BY: &str =
134 concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
135
136fn default_commit_retry_num() -> u32 {
137 8
138}
139
140fn default_iceberg_write_mode() -> IcebergWriteMode {
141 IcebergWriteMode::MergeOnRead
142}
143
144fn default_iceberg_format_version() -> FormatVersion {
145 FormatVersion::V2
146}
147
148fn default_true() -> bool {
149 true
150}
151
152fn default_some_true() -> Option<bool> {
153 Some(true)
154}
155
156fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
157 let parsed = value
158 .trim()
159 .parse::<u8>()
160 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
161 match parsed {
162 1 => Ok(FormatVersion::V1),
163 2 => Ok(FormatVersion::V2),
164 3 => Ok(FormatVersion::V3),
165 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
166 }
167}
168
169fn deserialize_format_version<'de, D>(
170 deserializer: D,
171) -> std::result::Result<FormatVersion, D::Error>
172where
173 D: Deserializer<'de>,
174{
175 struct FormatVersionVisitor;
176
177 impl<'de> Visitor<'de> for FormatVersionVisitor {
178 type Value = FormatVersion;
179
180 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 formatter.write_str("format-version as 1, 2, or 3")
182 }
183
184 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
185 where
186 E: de::Error,
187 {
188 let value = u8::try_from(value)
189 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
190 parse_format_version_str(&value.to_string()).map_err(E::custom)
191 }
192
193 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
194 where
195 E: de::Error,
196 {
197 let value = u8::try_from(value)
198 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
199 parse_format_version_str(&value.to_string()).map_err(E::custom)
200 }
201
202 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
203 where
204 E: de::Error,
205 {
206 parse_format_version_str(value).map_err(E::custom)
207 }
208
209 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
210 where
211 E: de::Error,
212 {
213 self.visit_str(&value)
214 }
215 }
216
217 deserializer.deserialize_any(FormatVersionVisitor)
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
222#[serde(rename_all = "kebab-case")]
223pub enum CompactionType {
224 #[default]
226 Full,
227 SmallFiles,
229 FilesWithDelete,
231}
232
233impl CompactionType {
234 pub fn as_str(&self) -> &'static str {
235 match self {
236 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
237 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
239 }
240 }
241}
242
243impl std::str::FromStr for CompactionType {
244 type Err = SinkError;
245
246 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
247 match s {
248 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
249 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
250 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
251 _ => Err(SinkError::Config(anyhow!(format!(
252 "invalid compaction_type: {}, must be one of: {}, {}, {}",
253 s,
254 ICEBERG_COMPACTION_TYPE_FULL,
255 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
256 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
257 )))),
258 }
259 }
260}
261
262impl TryFrom<&str> for CompactionType {
263 type Error = <Self as std::str::FromStr>::Err;
264
265 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
266 value.parse()
267 }
268}
269
270impl TryFrom<String> for CompactionType {
271 type Error = <Self as std::str::FromStr>::Err;
272
273 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
274 value.as_str().parse()
275 }
276}
277
278impl std::fmt::Display for CompactionType {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "{}", self.as_str())
281 }
282}
283
284#[serde_as]
285#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
286pub struct IcebergConfig {
287 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
290 pub force_append_only: bool,
291
292 #[serde(flatten)]
293 pub(crate) common: IcebergCommon,
294
295 #[serde(flatten)]
296 pub(crate) table: IcebergTableIdentifier,
297
298 #[serde(
299 rename = "primary_key",
300 default,
301 deserialize_with = "deserialize_optional_string_seq_from_string"
302 )]
303 pub primary_key: Option<Vec<String>>,
304
305 #[serde(skip)]
307 pub java_catalog_props: HashMap<String, String>,
308
309 #[serde(default)]
310 pub partition_by: Option<String>,
311
312 #[serde(default)]
313 pub order_key: Option<String>,
314
315 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
317 #[serde_as(as = "DisplayFromStr")]
318 #[with_option(allow_alter_on_fly)]
319 pub commit_checkpoint_interval: u64,
320
321 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
322 pub create_table_if_not_exists: bool,
323
324 #[serde(default = "default_some_true")]
326 #[serde_as(as = "Option<DisplayFromStr>")]
327 pub is_exactly_once: Option<bool>,
328 #[serde(default = "default_commit_retry_num")]
333 pub commit_retry_num: u32,
334
335 #[serde(
337 rename = "enable_compaction",
338 default,
339 deserialize_with = "deserialize_bool_from_string"
340 )]
341 #[with_option(allow_alter_on_fly)]
342 pub enable_compaction: bool,
343
344 #[serde(rename = "compaction_interval_sec", default)]
346 #[serde_as(as = "Option<DisplayFromStr>")]
347 #[with_option(allow_alter_on_fly)]
348 pub compaction_interval_sec: Option<u64>,
349
350 #[serde(
352 rename = "enable_snapshot_expiration",
353 default = "default_true",
354 deserialize_with = "deserialize_bool_from_string"
355 )]
356 #[with_option(allow_alter_on_fly)]
357 pub enable_snapshot_expiration: bool,
358
359 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
361 pub write_mode: IcebergWriteMode,
362
363 #[serde(
365 rename = "format_version",
366 default = "default_iceberg_format_version",
367 deserialize_with = "deserialize_format_version"
368 )]
369 pub format_version: FormatVersion,
370
371 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
374 #[serde_as(as = "Option<DisplayFromStr>")]
375 #[with_option(allow_alter_on_fly)]
376 pub snapshot_expiration_max_age_millis: Option<i64>,
377
378 #[serde(rename = "snapshot_expiration_retain_last", default)]
380 #[serde_as(as = "Option<DisplayFromStr>")]
381 #[with_option(allow_alter_on_fly)]
382 pub snapshot_expiration_retain_last: Option<i32>,
383
384 #[serde(
385 rename = "snapshot_expiration_clear_expired_files",
386 default = "default_true",
387 deserialize_with = "deserialize_bool_from_string"
388 )]
389 #[with_option(allow_alter_on_fly)]
390 pub snapshot_expiration_clear_expired_files: bool,
391
392 #[serde(
393 rename = "snapshot_expiration_clear_expired_meta_data",
394 default = "default_true",
395 deserialize_with = "deserialize_bool_from_string"
396 )]
397 #[with_option(allow_alter_on_fly)]
398 pub snapshot_expiration_clear_expired_meta_data: bool,
399
400 #[serde(rename = "compaction.max_snapshots_num", default)]
404 #[serde_as(as = "Option<DisplayFromStr>")]
405 #[with_option(allow_alter_on_fly)]
406 pub max_snapshots_num_before_compaction: Option<usize>,
407
408 #[serde(rename = "compaction.small_files_threshold_mb", default)]
409 #[serde_as(as = "Option<DisplayFromStr>")]
410 #[with_option(allow_alter_on_fly)]
411 pub small_files_threshold_mb: Option<u64>,
412
413 #[serde(rename = "compaction.delete_files_count_threshold", default)]
414 #[serde_as(as = "Option<DisplayFromStr>")]
415 #[with_option(allow_alter_on_fly)]
416 pub delete_files_count_threshold: Option<usize>,
417
418 #[serde(rename = "compaction.trigger_snapshot_count", default)]
419 #[serde_as(as = "Option<DisplayFromStr>")]
420 #[with_option(allow_alter_on_fly)]
421 pub trigger_snapshot_count: Option<usize>,
422
423 #[serde(rename = "compaction.target_file_size_mb", default)]
424 #[serde_as(as = "Option<DisplayFromStr>")]
425 #[with_option(allow_alter_on_fly)]
426 pub target_file_size_mb: Option<u64>,
427
428 #[serde(rename = "compaction.type", default)]
431 #[with_option(allow_alter_on_fly)]
432 pub compaction_type: Option<CompactionType>,
433
434 #[serde(rename = "compaction.write_parquet_compression", default)]
438 #[with_option(allow_alter_on_fly)]
439 pub write_parquet_compression: Option<String>,
440
441 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
444 #[serde_as(as = "Option<DisplayFromStr>")]
445 #[with_option(allow_alter_on_fly)]
446 pub write_parquet_max_row_group_rows: Option<usize>,
447
448 #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
451 #[serde_as(as = "Option<DisplayFromStr>")]
452 #[with_option(allow_alter_on_fly)]
453 pub write_parquet_max_row_group_bytes: Option<usize>,
454
455 #[serde(
458 rename = "enable_pk_index",
459 default,
460 deserialize_with = "deserialize_bool_from_string"
461 )]
462 pub enable_pk_index: bool,
463}
464
465impl EnforceSecret for IcebergConfig {
466 fn enforce_secret<'a>(
467 prop_iter: impl Iterator<Item = &'a str>,
468 ) -> crate::error::ConnectorResult<()> {
469 for prop in prop_iter {
470 IcebergCommon::enforce_one(prop)?;
471 }
472 Ok(())
473 }
474
475 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
476 IcebergCommon::enforce_one(prop)
477 }
478}
479
480impl IcebergConfig {
481 pub fn validate_append_only_write_mode(
484 sink_type: &str,
485 write_mode: IcebergWriteMode,
486 ) -> Result<()> {
487 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
488 return Err(SinkError::Config(anyhow!(
489 "'copy-on-write' mode is not supported for append-only iceberg sink. \
490 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
491 )));
492 }
493 Ok(())
494 }
495
496 pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
497 if !self.enable_pk_index {
498 return Ok(());
499 }
500
501 if self.r#type != SINK_TYPE_UPSERT {
502 return Err(SinkError::Config(anyhow!(
503 "`enable_pk_index` is only supported for upsert iceberg sink"
504 )));
505 }
506
507 if self.write_mode != IcebergWriteMode::MergeOnRead {
508 return Err(SinkError::Config(anyhow!(
509 "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
510 )));
511 }
512
513 if self.format_version < FormatVersion::V3 {
514 return Err(SinkError::Config(anyhow!(
515 "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
516 )));
517 }
518
519 if self.force_append_only {
520 return Err(SinkError::Config(anyhow!(
521 "`enable_pk_index` cannot be true when `force_append_only` is true"
522 )));
523 }
524
525 Ok(())
526 }
527
528 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
529 let mut config =
530 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
531 .map_err(|e| SinkError::Config(anyhow!(e)))?;
532
533 if config.enable_compaction && !values.contains_key(COMPACTION_MAX_SNAPSHOTS_NUM) {
534 config.max_snapshots_num_before_compaction = Some(DEFAULT_COMPACTION_MAX_SNAPSHOTS_NUM);
535 }
536
537 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
538 return Err(SinkError::Config(anyhow!(
539 "`{}` must be {}, or {}",
540 SINK_TYPE_OPTION,
541 SINK_TYPE_APPEND_ONLY,
542 SINK_TYPE_UPSERT
543 )));
544 }
545
546 if config.r#type == SINK_TYPE_UPSERT {
547 if let Some(primary_key) = &config.primary_key {
548 if primary_key.is_empty() {
549 return Err(SinkError::Config(anyhow!(
550 "`primary-key` must not be empty in {}",
551 SINK_TYPE_UPSERT
552 )));
553 }
554 } else {
555 return Err(SinkError::Config(anyhow!(
556 "Must set `primary-key` in {}",
557 SINK_TYPE_UPSERT
558 )));
559 }
560 }
561
562 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
564 config.validate_enable_pk_index()?;
565
566 config.java_catalog_props = values
568 .iter()
569 .filter(|(k, _v)| {
570 k.starts_with("catalog.")
571 && k != &"catalog.uri"
572 && k != &"catalog.type"
573 && k != &"catalog.name"
574 && k != &"catalog.header"
575 })
576 .map(|(k, v)| (k[8..].to_string(), v.clone()))
577 .collect();
578
579 if config.commit_checkpoint_interval == 0 {
580 return Err(SinkError::Config(anyhow!(
581 "`commit-checkpoint-interval` must be greater than 0"
582 )));
583 }
584
585 if config.trigger_snapshot_count == Some(0) {
586 return Err(SinkError::Config(anyhow!(
587 "`compaction.trigger_snapshot_count` must be greater than 0"
588 )));
589 }
590
591 if config.max_snapshots_num_before_compaction == Some(0) {
592 return Err(SinkError::Config(anyhow!(
593 "`compaction.max_snapshots_num` must be greater than 0"
594 )));
595 }
596
597 config
599 .table
600 .validate()
601 .map_err(|e| SinkError::Config(anyhow!(e)))?;
602
603 if config.write_parquet_max_row_group_rows.is_some() {
604 tracing::warn!(
605 "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
606 );
607 }
608
609 Ok(config)
610 }
611
612 pub fn catalog_type(&self) -> &str {
613 self.common.catalog_type()
614 }
615
616 pub async fn load_table(&self) -> Result<Table> {
617 #[cfg(any(test, madsim))]
618 if self.catalog_type() == "mock_v3" {
619 let catalog =
620 crate::sink::iceberg::mock_v3_catalog_registry::get().ok_or_else(|| {
621 SinkError::Config(anyhow!(
622 "mock_v3 catalog_type set but no mock catalog registered"
623 ))
624 })?;
625 let table_id = self
626 .table
627 .to_table_ident()
628 .map_err(|e| SinkError::Config(anyhow!(e).context("Unable to parse table name")))?;
629 let table = catalog
630 .load_table(&table_id)
631 .await
632 .map_err(|e| SinkError::Config(anyhow!(e).context("Failed to load mock table")))?;
633 return Ok(table);
634 }
635 self.common
636 .load_table(&self.table, &self.java_catalog_props)
637 .await
638 .map_err(Into::into)
639 }
640
641 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
642 #[cfg(any(test, madsim))]
643 if self.catalog_type() == "mock_v3" {
644 return Ok(
645 crate::sink::iceberg::mock_v3_catalog_registry::get().ok_or_else(|| {
646 anyhow::anyhow!("mock_v3 catalog_type set but no mock catalog registered")
647 })?,
648 );
649 }
650 self.common
651 .create_catalog(&self.java_catalog_props)
652 .await
653 .map_err(Into::into)
654 }
655
656 pub fn full_table_name(&self) -> Result<TableIdent> {
657 self.table.to_table_ident().map_err(Into::into)
658 }
659
660 pub fn catalog_name(&self) -> String {
661 self.common.catalog_name()
662 }
663
664 pub fn table_format_version(&self) -> FormatVersion {
665 self.format_version
666 }
667
668 pub fn compaction_interval_sec(&self) -> u64 {
669 self.compaction_interval_sec.unwrap_or(3600)
671 }
672
673 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
676 self.snapshot_expiration_max_age_millis
677 .map(|max_age_millis| current_time_ms - max_age_millis)
678 }
679
680 pub fn trigger_snapshot_count(&self) -> usize {
681 self.trigger_snapshot_count.unwrap_or(usize::MAX)
682 }
683
684 pub fn small_files_threshold_mb(&self) -> u64 {
685 self.small_files_threshold_mb.unwrap_or(64)
686 }
687
688 pub fn delete_files_count_threshold(&self) -> usize {
689 self.delete_files_count_threshold.unwrap_or(256)
690 }
691
692 pub fn target_file_size_mb(&self) -> u64 {
693 self.target_file_size_mb.unwrap_or(1024)
694 }
695
696 pub fn compaction_type(&self) -> CompactionType {
699 self.compaction_type.unwrap_or_default()
700 }
701
702 pub fn write_parquet_compression(&self) -> &str {
705 self.write_parquet_compression.as_deref().unwrap_or("zstd")
706 }
707
708 pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
710 self.write_parquet_max_row_group_rows
711 }
712
713 pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
715 self.write_parquet_max_row_group_bytes
716 .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
717 }
718
719 pub fn get_parquet_compression(&self) -> Compression {
722 parse_parquet_compression(self.write_parquet_compression())
723 }
724}
725
726pub fn parse_parquet_compression(codec: &str) -> Compression {
728 match codec.to_lowercase().as_str() {
729 "uncompressed" => Compression::UNCOMPRESSED,
730 "snappy" => Compression::SNAPPY,
731 "gzip" => Compression::GZIP(Default::default()),
732 "lzo" => Compression::LZO,
733 "brotli" => Compression::BROTLI(Default::default()),
734 "lz4" => Compression::LZ4,
735 "zstd" => Compression::ZSTD(Default::default()),
736 _ => {
737 tracing::warn!(
738 "Unknown compression codec '{}', falling back to SNAPPY",
739 codec
740 );
741 Compression::SNAPPY
742 }
743 }
744}
745
746pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
749 if should_enable_iceberg_cow(sink_type, write_mode) {
750 ICEBERG_COW_BRANCH.to_owned()
751 } else {
752 MAIN_BRANCH.to_owned()
753 }
754}
755
756pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
757 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
758}
759
760impl crate::with_options::WithOptions for IcebergWriteMode {}
761
762impl crate::with_options::WithOptions for FormatVersion {}
763
764impl crate::with_options::WithOptions for CompactionType {}