1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49 #[default]
50 MergeOnRead,
51 CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55 pub fn as_str(self) -> &'static str {
56 match self {
57 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59 }
60 }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64 type Err = SinkError;
65
66 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67 match s {
68 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70 _ => Err(SinkError::Config(anyhow!(format!(
71 "invalid write_mode: {}, must be one of: {}, {}",
72 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73 )))),
74 }
75 }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79 type Error = <Self as std::str::FromStr>::Err;
80
81 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82 value.parse()
83 }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87 type Error = <Self as std::str::FromStr>::Err;
88
89 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90 value.as_str().parse()
91 }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110 "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125 "compaction.write_parquet_max_row_group_rows";
126pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: &str =
127 "compaction.write_parquet_max_row_group_bytes";
128pub const ORDER_KEY: &str = "order_key";
129pub const ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
130pub const ENABLE_PK_INDEX: &str = "enable_pk_index";
131
132pub(super) const PARQUET_CREATED_BY: &str =
133 concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
134
135fn default_commit_retry_num() -> u32 {
136 8
137}
138
139fn default_iceberg_write_mode() -> IcebergWriteMode {
140 IcebergWriteMode::MergeOnRead
141}
142
143fn default_iceberg_format_version() -> FormatVersion {
144 FormatVersion::V2
145}
146
147fn default_true() -> bool {
148 true
149}
150
151fn default_some_true() -> Option<bool> {
152 Some(true)
153}
154
155fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
156 let parsed = value
157 .trim()
158 .parse::<u8>()
159 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
160 match parsed {
161 1 => Ok(FormatVersion::V1),
162 2 => Ok(FormatVersion::V2),
163 3 => Ok(FormatVersion::V3),
164 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
165 }
166}
167
168fn deserialize_format_version<'de, D>(
169 deserializer: D,
170) -> std::result::Result<FormatVersion, D::Error>
171where
172 D: Deserializer<'de>,
173{
174 struct FormatVersionVisitor;
175
176 impl<'de> Visitor<'de> for FormatVersionVisitor {
177 type Value = FormatVersion;
178
179 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 formatter.write_str("format-version as 1, 2, or 3")
181 }
182
183 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
184 where
185 E: de::Error,
186 {
187 let value = u8::try_from(value)
188 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
189 parse_format_version_str(&value.to_string()).map_err(E::custom)
190 }
191
192 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
193 where
194 E: de::Error,
195 {
196 let value = u8::try_from(value)
197 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
198 parse_format_version_str(&value.to_string()).map_err(E::custom)
199 }
200
201 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
202 where
203 E: de::Error,
204 {
205 parse_format_version_str(value).map_err(E::custom)
206 }
207
208 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
209 where
210 E: de::Error,
211 {
212 self.visit_str(&value)
213 }
214 }
215
216 deserializer.deserialize_any(FormatVersionVisitor)
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
221#[serde(rename_all = "kebab-case")]
222pub enum CompactionType {
223 #[default]
225 Full,
226 SmallFiles,
228 FilesWithDelete,
230}
231
232impl CompactionType {
233 pub fn as_str(&self) -> &'static str {
234 match self {
235 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
236 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
237 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
238 }
239 }
240}
241
242impl std::str::FromStr for CompactionType {
243 type Err = SinkError;
244
245 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
246 match s {
247 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
248 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
249 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
250 _ => Err(SinkError::Config(anyhow!(format!(
251 "invalid compaction_type: {}, must be one of: {}, {}, {}",
252 s,
253 ICEBERG_COMPACTION_TYPE_FULL,
254 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
255 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
256 )))),
257 }
258 }
259}
260
261impl TryFrom<&str> for CompactionType {
262 type Error = <Self as std::str::FromStr>::Err;
263
264 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
265 value.parse()
266 }
267}
268
269impl TryFrom<String> for CompactionType {
270 type Error = <Self as std::str::FromStr>::Err;
271
272 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
273 value.as_str().parse()
274 }
275}
276
277impl std::fmt::Display for CompactionType {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "{}", self.as_str())
280 }
281}
282
283#[serde_as]
284#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
285pub struct IcebergConfig {
286 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
289 pub force_append_only: bool,
290
291 #[serde(flatten)]
292 pub(crate) common: IcebergCommon,
293
294 #[serde(flatten)]
295 pub(crate) table: IcebergTableIdentifier,
296
297 #[serde(
298 rename = "primary_key",
299 default,
300 deserialize_with = "deserialize_optional_string_seq_from_string"
301 )]
302 pub primary_key: Option<Vec<String>>,
303
304 #[serde(skip)]
306 pub java_catalog_props: HashMap<String, String>,
307
308 #[serde(default)]
309 pub partition_by: Option<String>,
310
311 #[serde(default)]
312 pub order_key: Option<String>,
313
314 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
316 #[serde_as(as = "DisplayFromStr")]
317 #[with_option(allow_alter_on_fly)]
318 pub commit_checkpoint_interval: u64,
319
320 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
321 pub create_table_if_not_exists: bool,
322
323 #[serde(default = "default_some_true")]
325 #[serde_as(as = "Option<DisplayFromStr>")]
326 pub is_exactly_once: Option<bool>,
327 #[serde(default = "default_commit_retry_num")]
332 pub commit_retry_num: u32,
333
334 #[serde(
336 rename = "enable_compaction",
337 default,
338 deserialize_with = "deserialize_bool_from_string"
339 )]
340 #[with_option(allow_alter_on_fly)]
341 pub enable_compaction: bool,
342
343 #[serde(rename = "compaction_interval_sec", default)]
345 #[serde_as(as = "Option<DisplayFromStr>")]
346 #[with_option(allow_alter_on_fly)]
347 pub compaction_interval_sec: Option<u64>,
348
349 #[serde(
351 rename = "enable_snapshot_expiration",
352 default = "default_true",
353 deserialize_with = "deserialize_bool_from_string"
354 )]
355 #[with_option(allow_alter_on_fly)]
356 pub enable_snapshot_expiration: bool,
357
358 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
360 pub write_mode: IcebergWriteMode,
361
362 #[serde(
364 rename = "format_version",
365 default = "default_iceberg_format_version",
366 deserialize_with = "deserialize_format_version"
367 )]
368 pub format_version: FormatVersion,
369
370 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
373 #[serde_as(as = "Option<DisplayFromStr>")]
374 #[with_option(allow_alter_on_fly)]
375 pub snapshot_expiration_max_age_millis: Option<i64>,
376
377 #[serde(rename = "snapshot_expiration_retain_last", default)]
379 #[serde_as(as = "Option<DisplayFromStr>")]
380 #[with_option(allow_alter_on_fly)]
381 pub snapshot_expiration_retain_last: Option<i32>,
382
383 #[serde(
384 rename = "snapshot_expiration_clear_expired_files",
385 default = "default_true",
386 deserialize_with = "deserialize_bool_from_string"
387 )]
388 #[with_option(allow_alter_on_fly)]
389 pub snapshot_expiration_clear_expired_files: bool,
390
391 #[serde(
392 rename = "snapshot_expiration_clear_expired_meta_data",
393 default = "default_true",
394 deserialize_with = "deserialize_bool_from_string"
395 )]
396 #[with_option(allow_alter_on_fly)]
397 pub snapshot_expiration_clear_expired_meta_data: bool,
398
399 #[serde(rename = "compaction.max_snapshots_num", default)]
402 #[serde_as(as = "Option<DisplayFromStr>")]
403 #[with_option(allow_alter_on_fly)]
404 pub max_snapshots_num_before_compaction: Option<usize>,
405
406 #[serde(rename = "compaction.small_files_threshold_mb", default)]
407 #[serde_as(as = "Option<DisplayFromStr>")]
408 #[with_option(allow_alter_on_fly)]
409 pub small_files_threshold_mb: Option<u64>,
410
411 #[serde(rename = "compaction.delete_files_count_threshold", default)]
412 #[serde_as(as = "Option<DisplayFromStr>")]
413 #[with_option(allow_alter_on_fly)]
414 pub delete_files_count_threshold: Option<usize>,
415
416 #[serde(rename = "compaction.trigger_snapshot_count", default)]
417 #[serde_as(as = "Option<DisplayFromStr>")]
418 #[with_option(allow_alter_on_fly)]
419 pub trigger_snapshot_count: Option<usize>,
420
421 #[serde(rename = "compaction.target_file_size_mb", default)]
422 #[serde_as(as = "Option<DisplayFromStr>")]
423 #[with_option(allow_alter_on_fly)]
424 pub target_file_size_mb: Option<u64>,
425
426 #[serde(rename = "compaction.type", default)]
429 #[with_option(allow_alter_on_fly)]
430 pub compaction_type: Option<CompactionType>,
431
432 #[serde(rename = "compaction.write_parquet_compression", default)]
436 #[with_option(allow_alter_on_fly)]
437 pub write_parquet_compression: Option<String>,
438
439 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
442 #[serde_as(as = "Option<DisplayFromStr>")]
443 #[with_option(allow_alter_on_fly)]
444 pub write_parquet_max_row_group_rows: Option<usize>,
445
446 #[serde(rename = "compaction.write_parquet_max_row_group_bytes", default)]
449 #[serde_as(as = "Option<DisplayFromStr>")]
450 #[with_option(allow_alter_on_fly)]
451 pub write_parquet_max_row_group_bytes: Option<usize>,
452
453 #[serde(
456 rename = "enable_pk_index",
457 default,
458 deserialize_with = "deserialize_bool_from_string"
459 )]
460 pub enable_pk_index: bool,
461}
462
463impl EnforceSecret for IcebergConfig {
464 fn enforce_secret<'a>(
465 prop_iter: impl Iterator<Item = &'a str>,
466 ) -> crate::error::ConnectorResult<()> {
467 for prop in prop_iter {
468 IcebergCommon::enforce_one(prop)?;
469 }
470 Ok(())
471 }
472
473 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
474 IcebergCommon::enforce_one(prop)
475 }
476}
477
478impl IcebergConfig {
479 pub fn validate_append_only_write_mode(
482 sink_type: &str,
483 write_mode: IcebergWriteMode,
484 ) -> Result<()> {
485 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
486 return Err(SinkError::Config(anyhow!(
487 "'copy-on-write' mode is not supported for append-only iceberg sink. \
488 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
489 )));
490 }
491 Ok(())
492 }
493
494 pub(crate) fn validate_enable_pk_index(&self) -> Result<()> {
495 if !self.enable_pk_index {
496 return Ok(());
497 }
498
499 if self.r#type != SINK_TYPE_UPSERT {
500 return Err(SinkError::Config(anyhow!(
501 "`enable_pk_index` is only supported for upsert iceberg sink"
502 )));
503 }
504
505 if self.write_mode != IcebergWriteMode::MergeOnRead {
506 return Err(SinkError::Config(anyhow!(
507 "`enable_pk_index` is only supported for upsert iceberg sink with merge-on-read mode"
508 )));
509 }
510
511 if self.format_version < FormatVersion::V3 {
512 return Err(SinkError::Config(anyhow!(
513 "`enable_pk_index` is only supported for upsert iceberg sink with format version >= 3"
514 )));
515 }
516
517 if self.force_append_only {
518 return Err(SinkError::Config(anyhow!(
519 "`enable_pk_index` cannot be true when `force_append_only` is true"
520 )));
521 }
522
523 Ok(())
524 }
525
526 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
527 let mut config =
528 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
529 .map_err(|e| SinkError::Config(anyhow!(e)))?;
530
531 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
532 return Err(SinkError::Config(anyhow!(
533 "`{}` must be {}, or {}",
534 SINK_TYPE_OPTION,
535 SINK_TYPE_APPEND_ONLY,
536 SINK_TYPE_UPSERT
537 )));
538 }
539
540 if config.r#type == SINK_TYPE_UPSERT {
541 if let Some(primary_key) = &config.primary_key {
542 if primary_key.is_empty() {
543 return Err(SinkError::Config(anyhow!(
544 "`primary-key` must not be empty in {}",
545 SINK_TYPE_UPSERT
546 )));
547 }
548 } else {
549 return Err(SinkError::Config(anyhow!(
550 "Must set `primary-key` in {}",
551 SINK_TYPE_UPSERT
552 )));
553 }
554 }
555
556 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
558 config.validate_enable_pk_index()?;
559
560 config.java_catalog_props = values
562 .iter()
563 .filter(|(k, _v)| {
564 k.starts_with("catalog.")
565 && k != &"catalog.uri"
566 && k != &"catalog.type"
567 && k != &"catalog.name"
568 && k != &"catalog.header"
569 })
570 .map(|(k, v)| (k[8..].to_string(), v.clone()))
571 .collect();
572
573 if config.commit_checkpoint_interval == 0 {
574 return Err(SinkError::Config(anyhow!(
575 "`commit-checkpoint-interval` must be greater than 0"
576 )));
577 }
578
579 if config.trigger_snapshot_count == Some(0) {
580 return Err(SinkError::Config(anyhow!(
581 "`compaction.trigger_snapshot_count` must be greater than 0"
582 )));
583 }
584
585 config
587 .table
588 .validate()
589 .map_err(|e| SinkError::Config(anyhow!(e)))?;
590
591 if config.write_parquet_max_row_group_rows.is_some() {
592 tracing::warn!(
593 "`compaction.write_parquet_max_row_group_rows` is deprecated and ignored; use `compaction.write_parquet_max_row_group_bytes` instead"
594 );
595 }
596
597 Ok(config)
598 }
599
600 pub fn catalog_type(&self) -> &str {
601 self.common.catalog_type()
602 }
603
604 pub async fn load_table(&self) -> Result<Table> {
605 self.common
606 .load_table(&self.table, &self.java_catalog_props)
607 .await
608 .map_err(Into::into)
609 }
610
611 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
612 self.common
613 .create_catalog(&self.java_catalog_props)
614 .await
615 .map_err(Into::into)
616 }
617
618 pub fn full_table_name(&self) -> Result<TableIdent> {
619 self.table.to_table_ident().map_err(Into::into)
620 }
621
622 pub fn catalog_name(&self) -> String {
623 self.common.catalog_name()
624 }
625
626 pub fn table_format_version(&self) -> FormatVersion {
627 self.format_version
628 }
629
630 pub fn compaction_interval_sec(&self) -> u64 {
631 self.compaction_interval_sec.unwrap_or(3600)
633 }
634
635 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
638 self.snapshot_expiration_max_age_millis
639 .map(|max_age_millis| current_time_ms - max_age_millis)
640 }
641
642 pub fn trigger_snapshot_count(&self) -> usize {
643 self.trigger_snapshot_count.unwrap_or(usize::MAX)
644 }
645
646 pub fn small_files_threshold_mb(&self) -> u64 {
647 self.small_files_threshold_mb.unwrap_or(64)
648 }
649
650 pub fn delete_files_count_threshold(&self) -> usize {
651 self.delete_files_count_threshold.unwrap_or(256)
652 }
653
654 pub fn target_file_size_mb(&self) -> u64 {
655 self.target_file_size_mb.unwrap_or(1024)
656 }
657
658 pub fn compaction_type(&self) -> CompactionType {
661 self.compaction_type.unwrap_or_default()
662 }
663
664 pub fn write_parquet_compression(&self) -> &str {
667 self.write_parquet_compression.as_deref().unwrap_or("zstd")
668 }
669
670 pub fn write_parquet_max_row_group_rows(&self) -> Option<usize> {
672 self.write_parquet_max_row_group_rows
673 }
674
675 pub fn write_parquet_max_row_group_bytes(&self) -> Option<usize> {
677 self.write_parquet_max_row_group_bytes
678 .or(Some(ICEBERG_DEFAULT_WRITE_PARQUET_MAX_ROW_GROUP_BYTES))
679 }
680
681 pub fn get_parquet_compression(&self) -> Compression {
684 parse_parquet_compression(self.write_parquet_compression())
685 }
686}
687
688pub fn parse_parquet_compression(codec: &str) -> Compression {
690 match codec.to_lowercase().as_str() {
691 "uncompressed" => Compression::UNCOMPRESSED,
692 "snappy" => Compression::SNAPPY,
693 "gzip" => Compression::GZIP(Default::default()),
694 "lzo" => Compression::LZO,
695 "brotli" => Compression::BROTLI(Default::default()),
696 "lz4" => Compression::LZ4,
697 "zstd" => Compression::ZSTD(Default::default()),
698 _ => {
699 tracing::warn!(
700 "Unknown compression codec '{}', falling back to SNAPPY",
701 codec
702 );
703 Compression::SNAPPY
704 }
705 }
706}
707
708pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
711 if should_enable_iceberg_cow(sink_type, write_mode) {
712 ICEBERG_COW_BRANCH.to_owned()
713 } else {
714 MAIN_BRANCH.to_owned()
715 }
716}
717
718pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
719 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
720}
721
722impl crate::with_options::WithOptions for IcebergWriteMode {}
723
724impl crate::with_options::WithOptions for FormatVersion {}
725
726impl crate::with_options::WithOptions for CompactionType {}