1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49 #[default]
50 MergeOnRead,
51 CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55 pub fn as_str(self) -> &'static str {
56 match self {
57 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59 }
60 }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64 type Err = SinkError;
65
66 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67 match s {
68 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70 _ => Err(SinkError::Config(anyhow!(format!(
71 "invalid write_mode: {}, must be one of: {}, {}",
72 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73 )))),
74 }
75 }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79 type Error = <Self as std::str::FromStr>::Err;
80
81 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82 value.parse()
83 }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87 type Error = <Self as std::str::FromStr>::Err;
88
89 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90 value.as_str().parse()
91 }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110 "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125 "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127 "compaction.write_parquet_max_row_group_bytes";
128pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
129pub const ORDER_KEY: &str = "order_key";
130pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
131pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
132pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
133
134pub(super) const PARQUET_CREATED_BY: &str =
135 concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
136
137fn default_commit_retry_num() -> u32 {
138 8
139}
140
141fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
142 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
143}
144
145fn default_iceberg_write_mode() -> IcebergWriteMode {
146 IcebergWriteMode::MergeOnRead
147}
148
149fn default_iceberg_format_version() -> FormatVersion {
150 FormatVersion::V2
151}
152
153fn default_true() -> bool {
154 true
155}
156
157fn default_some_true() -> Option<bool> {
158 Some(true)
159}
160
161fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
162 let parsed = value
163 .trim()
164 .parse::<u8>()
165 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
166 match parsed {
167 1 => Ok(FormatVersion::V1),
168 2 => Ok(FormatVersion::V2),
169 3 => Ok(FormatVersion::V3),
170 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
171 }
172}
173
174fn deserialize_format_version<'de, D>(
175 deserializer: D,
176) -> std::result::Result<FormatVersion, D::Error>
177where
178 D: Deserializer<'de>,
179{
180 struct FormatVersionVisitor;
181
182 impl<'de> Visitor<'de> for FormatVersionVisitor {
183 type Value = FormatVersion;
184
185 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 formatter.write_str("format-version as 1, 2, or 3")
187 }
188
189 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
190 where
191 E: de::Error,
192 {
193 let value = u8::try_from(value)
194 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
195 parse_format_version_str(&value.to_string()).map_err(E::custom)
196 }
197
198 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
199 where
200 E: de::Error,
201 {
202 let value = u8::try_from(value)
203 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
204 parse_format_version_str(&value.to_string()).map_err(E::custom)
205 }
206
207 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
208 where
209 E: de::Error,
210 {
211 parse_format_version_str(value).map_err(E::custom)
212 }
213
214 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
215 where
216 E: de::Error,
217 {
218 self.visit_str(&value)
219 }
220 }
221
222 deserializer.deserialize_any(FormatVersionVisitor)
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
227#[serde(rename_all = "kebab-case")]
228pub enum CompactionType {
229 #[default]
231 Full,
232 SmallFiles,
234 FilesWithDelete,
236}
237
238impl CompactionType {
239 pub fn as_str(&self) -> &'static str {
240 match self {
241 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
242 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
243 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
244 }
245 }
246}
247
248impl std::str::FromStr for CompactionType {
249 type Err = SinkError;
250
251 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
252 match s {
253 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
254 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
255 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
256 _ => Err(SinkError::Config(anyhow!(format!(
257 "invalid compaction_type: {}, must be one of: {}, {}, {}",
258 s,
259 ICEBERG_COMPACTION_TYPE_FULL,
260 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
261 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
262 )))),
263 }
264 }
265}
266
267impl TryFrom<&str> for CompactionType {
268 type Error = <Self as std::str::FromStr>::Err;
269
270 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
271 value.parse()
272 }
273}
274
275impl TryFrom<String> for CompactionType {
276 type Error = <Self as std::str::FromStr>::Err;
277
278 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
279 value.as_str().parse()
280 }
281}
282
283impl std::fmt::Display for CompactionType {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 write!(f, "{}", self.as_str())
286 }
287}
288
289#[serde_as]
290#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
291pub struct IcebergConfig {
292 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
295 pub force_append_only: bool,
296
297 #[serde(flatten)]
298 pub(crate) common: IcebergCommon,
299
300 #[serde(flatten)]
301 pub(crate) table: IcebergTableIdentifier,
302
303 #[serde(
304 rename = "primary_key",
305 default,
306 deserialize_with = "deserialize_optional_string_seq_from_string"
307 )]
308 pub primary_key: Option<Vec<String>>,
309
310 #[serde(skip)]
312 pub java_catalog_props: HashMap<String, String>,
313
314 #[serde(default)]
315 pub partition_by: Option<String>,
316
317 #[serde(default)]
318 pub order_key: Option<String>,
319
320 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
322 #[serde_as(as = "DisplayFromStr")]
323 #[with_option(allow_alter_on_fly)]
324 pub commit_checkpoint_interval: u64,
325
326 #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
329 #[serde_as(as = "Option<DisplayFromStr>")]
330 #[with_option(allow_alter_on_fly)]
331 pub commit_checkpoint_size_threshold_mb: Option<u64>,
332
333 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
334 pub create_table_if_not_exists: bool,
335
336 #[serde(default = "default_some_true")]
338 #[serde_as(as = "Option<DisplayFromStr>")]
339 pub is_exactly_once: Option<bool>,
340 #[serde(default = "default_commit_retry_num")]
345 pub commit_retry_num: u32,
346
347 #[serde(
349 rename = "enable_compaction",
350 default,
351 deserialize_with = "deserialize_bool_from_string"
352 )]
353 #[with_option(allow_alter_on_fly)]
354 pub enable_compaction: bool,
355
356 #[serde(rename = "compaction_interval_sec", default)]
358 #[serde_as(as = "Option<DisplayFromStr>")]
359 #[with_option(allow_alter_on_fly)]
360 pub compaction_interval_sec: Option<u64>,
361
362 #[serde(
364 rename = "enable_snapshot_expiration",
365 default,
366 deserialize_with = "deserialize_bool_from_string"
367 )]
368 #[with_option(allow_alter_on_fly)]
369 pub enable_snapshot_expiration: bool,
370
371 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
373 pub write_mode: IcebergWriteMode,
374
375 #[serde(
377 rename = "format_version",
378 default = "default_iceberg_format_version",
379 deserialize_with = "deserialize_format_version"
380 )]
381 pub format_version: FormatVersion,
382
383 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
386 #[serde_as(as = "Option<DisplayFromStr>")]
387 #[with_option(allow_alter_on_fly)]
388 pub snapshot_expiration_max_age_millis: Option<i64>,
389
390 #[serde(rename = "snapshot_expiration_retain_last", default)]
392 #[serde_as(as = "Option<DisplayFromStr>")]
393 #[with_option(allow_alter_on_fly)]
394 pub snapshot_expiration_retain_last: Option<i32>,
395
396 #[serde(
397 rename = "snapshot_expiration_clear_expired_files",
398 default = "default_true",
399 deserialize_with = "deserialize_bool_from_string"
400 )]
401 #[with_option(allow_alter_on_fly)]
402 pub snapshot_expiration_clear_expired_files: bool,
403
404 #[serde(
405 rename = "snapshot_expiration_clear_expired_meta_data",
406 default = "default_true",
407 deserialize_with = "deserialize_bool_from_string"
408 )]
409 #[with_option(allow_alter_on_fly)]
410 pub snapshot_expiration_clear_expired_meta_data: bool,
411
412 #[serde(rename = "compaction.max_snapshots_num", default)]
415 #[serde_as(as = "Option<DisplayFromStr>")]
416 #[with_option(allow_alter_on_fly)]
417 pub max_snapshots_num_before_compaction: Option<usize>,
418
419 #[serde(rename = "compaction.small_files_threshold_mb", default)]
420 #[serde_as(as = "Option<DisplayFromStr>")]
421 #[with_option(allow_alter_on_fly)]
422 pub small_files_threshold_mb: Option<u64>,
423
424 #[serde(rename = "compaction.delete_files_count_threshold", default)]
425 #[serde_as(as = "Option<DisplayFromStr>")]
426 #[with_option(allow_alter_on_fly)]
427 pub delete_files_count_threshold: Option<usize>,
428
429 #[serde(rename = "compaction.trigger_snapshot_count", default)]
430 #[serde_as(as = "Option<DisplayFromStr>")]
431 #[with_option(allow_alter_on_fly)]
432 pub trigger_snapshot_count: Option<usize>,
433
434 #[serde(rename = "compaction.target_file_size_mb", default)]
435 #[serde_as(as = "Option<DisplayFromStr>")]
436 #[with_option(allow_alter_on_fly)]
437 pub target_file_size_mb: Option<u64>,
438
439 #[serde(rename = "compaction.type", default)]
442 #[with_option(allow_alter_on_fly)]
443 pub compaction_type: Option<CompactionType>,
444
445 #[serde(rename = "compaction.write_parquet_compression", default)]
449 #[with_option(allow_alter_on_fly)]
450 pub write_parquet_compression: Option<String>,
451
452 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
455 #[serde_as(as = "Option<DisplayFromStr>")]
456 #[with_option(allow_alter_on_fly)]
457 pub write_parquet_max_row_group_rows: Option<usize>,
458
459 #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
462 #[serde_as(as = "Option<DisplayFromStr>")]
463 #[with_option(allow_alter_on_fly)]
464 pub write_parquet_max_row_group_bytes: Option<usize>,
465
466 #[serde(
469 rename = "enable_pk_index",
470 default,
471 deserialize_with = "deserialize_bool_from_string"
472 )]
473 pub enable_pk_index: bool,
474}
475
476impl EnforceSecret for IcebergConfig {
477 fn enforce_secret<'a>(
478 prop_iter: impl Iterator<Item = &'a str>,
479 ) -> crate::error::ConnectorResult<()> {
480 for prop in prop_iter {
481 IcebergCommon::enforce_one(prop)?;
482 }
483 Ok(())
484 }
485
486 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
487 IcebergCommon::enforce_one(prop)
488 }
489}
490
491impl IcebergConfig {
492 pub fn validate_append_only_write_mode(
495 sink_type: &str,
496 write_mode: IcebergWriteMode,
497 ) -> Result<()> {
498 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
499 return Err(SinkError::Config(anyhow!(
500 "'copy-on-write' mode is not supported for append-only iceberg sink. \
501 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
502 )));
503 }
504 Ok(())
505 }
506
507 pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
508 if !self.enable_pk_index {
509 return Ok(());
510 }
511
512 if self.r#type != SINK_TYPE_UPSERT {
513 return Err(SinkError::Config(anyhow!(
514 "`enable_pk_index` is only supported for upsert iceberg sink"
515 )));
516 }
517
518 if self.write_mode != IcebergWriteMode::MergeOnRead {
519 return Err(SinkError::Config(anyhow!(
520 "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
521 )));
522 }
523
524 if self.format_version < FormatVersion::V3 {
525 return Err(SinkError::Config(anyhow!(
526 "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
527 )));
528 }
529
530 if self.force_append_only {
531 return Err(SinkError::Config(anyhow!(
532 "`enable_pk_index` cannot be true when `force_append_only` is true"
533 )));
534 }
535
536 Ok(())
537 }
538
539 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
540 let mut config =
541 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
542 .map_err(|e| SinkError::Config(anyhow!(e)))?;
543
544 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
545 return Err(SinkError::Config(anyhow!(
546 "`{}` must be {}, or {}",
547 SINK_TYPE_OPTION,
548 SINK_TYPE_APPEND_ONLY,
549 SINK_TYPE_UPSERT
550 )));
551 }
552
553 if config.r#type == SINK_TYPE_UPSERT {
554 if let Some(primary_key) = &config.primary_key {
555 if primary_key.is_empty() {
556 return Err(SinkError::Config(anyhow!(
557 "`primary-key` must not be empty in {}",
558 SINK_TYPE_UPSERT
559 )));
560 }
561 } else {
562 return Err(SinkError::Config(anyhow!(
563 "Must set `primary-key` in {}",
564 SINK_TYPE_UPSERT
565 )));
566 }
567 }
568
569 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
571 config.validate_enable_pk_index()?;
572
573 config.java_catalog_props = values
575 .iter()
576 .filter(|(k, _v)| {
577 k.starts_with("catalog.")
578 && k != &"catalog.uri"
579 && k != &"catalog.type"
580 && k != &"catalog.name"
581 && k != &"catalog.header"
582 })
583 .map(|(k, v)| (k[8..].to_string(), v.clone()))
584 .collect();
585
586 if config.commit_checkpoint_interval == 0 {
587 return Err(SinkError::Config(anyhow!(
588 "`commit-checkpoint-interval` must be greater than 0"
589 )));
590 }
591
592 if config.commit_checkpoint_size_threshold_mb == Some(0) {
593 return Err(SinkError::Config(anyhow!(
594 "`commit_checkpoint_size_threshold_mb` must be greater than 0"
595 )));
596 }
597
598 if config.trigger_snapshot_count == Some(0) {
599 return Err(SinkError::Config(anyhow!(
600 "`compaction.trigger_snapshot_count` must be greater than 0"
601 )));
602 }
603
604 config
606 .table
607 .validate()
608 .map_err(|e| SinkError::Config(anyhow!(e)))?;
609
610 if config.write_parquet_max_row_group_rows.is_some() {
611 tracing::warn!(
612 "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
613 );
614 }
615
616 Ok(config)
617 }
618
619 pub fn catalog_type(&self) -> &str {
620 self.common.catalog_type()
621 }
622
623 pub async fn load_table(&self) -> Result<Table> {
624 self.common
625 .load_table(&self.table, &self.java_catalog_props)
626 .await
627 .map_err(Into::into)
628 }
629
630 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
631 self.common
632 .create_catalog(&self.java_catalog_props)
633 .await
634 .map_err(Into::into)
635 }
636
637 pub fn full_table_name(&self) -> Result<TableIdent> {
638 self.table.to_table_ident().map_err(Into::into)
639 }
640
641 pub fn catalog_name(&self) -> String {
642 self.common.catalog_name()
643 }
644
645 pub fn table_format_version(&self) -> FormatVersion {
646 self.format_version
647 }
648
649 pub fn compaction_interval_sec(&self) -> u64 {
650 self.compaction_interval_sec.unwrap_or(3600)
652 }
653
654 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
657 self.snapshot_expiration_max_age_millis
658 .map(|max_age_millis| current_time_ms - max_age_millis)
659 }
660
661 pub fn trigger_snapshot_count(&self) -> usize {
662 self.trigger_snapshot_count.unwrap_or(usize::MAX)
663 }
664
665 pub fn small_files_threshold_mb(&self) -> u64 {
666 self.small_files_threshold_mb.unwrap_or(64)
667 }
668
669 pub fn delete_files_count_threshold(&self) -> usize {
670 self.delete_files_count_threshold.unwrap_or(256)
671 }
672
673 pub fn target_file_size_mb(&self) -> u64 {
674 self.target_file_size_mb.unwrap_or(1024)
675 }
676
677 pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
678 self.commit_checkpoint_size_threshold_mb
679 .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
680 }
681
682 pub fn compaction_type(&self) -> CompactionType {
685 self.compaction_type.unwrap_or_default()
686 }
687
688 pub fn write_parquet_compression(&self) -> &str {
691 self.write_parquet_compression.as_deref().unwrap_or("zstd")
692 }
693
694 pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
696 self.write_parquet_max_row_group_rows
697 }
698
699 pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
701 self.write_parquet_max_row_group_bytes
702 .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
703 }
704
705 pub fn get_parquet_compression(&self) -> Compression {
708 parse_parquet_compression(self.write_parquet_compression())
709 }
710}
711
712pub fn parse_parquet_compression(codec: &str) -> Compression {
714 match codec.to_lowercase().as_str() {
715 "uncompressed" => Compression::UNCOMPRESSED,
716 "snappy" => Compression::SNAPPY,
717 "gzip" => Compression::GZIP(Default::default()),
718 "lzo" => Compression::LZO,
719 "brotli" => Compression::BROTLI(Default::default()),
720 "lz4" => Compression::LZ4,
721 "zstd" => Compression::ZSTD(Default::default()),
722 _ => {
723 tracing::warn!(
724 "Unknown compression codec '{}', falling back to SNAPPY",
725 codec
726 );
727 Compression::SNAPPY
728 }
729 }
730}
731
732pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
735 if should_enable_iceberg_cow(sink_type, write_mode) {
736 ICEBERG_COW_BRANCH.to_owned()
737 } else {
738 MAIN_BRANCH.to_owned()
739 }
740}
741
742pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
743 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
744}
745
746impl crate::with_options::WithOptions for IcebergWriteMode {}
747
748impl crate::with_options::WithOptions for FormatVersion {}
749
750impl crate::with_options::WithOptions for CompactionType {}