1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::{FormatVersion, MAIN_BRANCH};
21use iceberg::table::Table;
22use iceberg::{Catalog, TableIdent};
23use parquet::basic::Compression;
24use serde::de::{self, Deserializer, Visitor};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
31use crate::enforce_secret::EnforceSecret;
32use crate::sink::Result;
33use crate::sink::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
34use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
35
36pub const ICEBERG_COW_BRANCH: &str = "ingestion";
37
38pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
39pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
40pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
41pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
42pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
43
44pub const PARTITION_DATA_ID_START: i32 = 1000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
47#[serde(rename_all = "kebab-case")]
48pub enum IcebergWriteMode {
49 #[default]
50 MergeOnRead,
51 CopyOnWrite,
52}
53
54impl IcebergWriteMode {
55 pub fn as_str(self) -> &'static str {
56 match self {
57 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
58 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
59 }
60 }
61}
62
63impl std::str::FromStr for IcebergWriteMode {
64 type Err = SinkError;
65
66 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
67 match s {
68 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
69 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
70 _ => Err(SinkError::Config(anyhow!(format!(
71 "invalid write_mode: {}, must be one of: {}, {}",
72 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
73 )))),
74 }
75 }
76}
77
78impl TryFrom<&str> for IcebergWriteMode {
79 type Error = <Self as std::str::FromStr>::Err;
80
81 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
82 value.parse()
83 }
84}
85
86impl TryFrom<String> for IcebergWriteMode {
87 type Error = <Self as std::str::FromStr>::Err;
88
89 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
90 value.as_str().parse()
91 }
92}
93
94impl std::fmt::Display for IcebergWriteMode {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100pub const ENABLE_COMPACTION: &str = "enable_compaction";
102pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
103pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
104pub const WRITE_MODE: &str = "write_mode";
105pub const FORMAT_VERSION: &str = "format_version";
106pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
107pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
108pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
109pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
110 "snapshot_expiration_clear_expired_meta_data";
111pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
112
113pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
114
115pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
116
117pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
118
119pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
120
121pub const COMPACTION_TYPE: &str = "compaction.type";
122
123pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
124pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
125 "compaction.write_parquet_max_row_group_rows";
126pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
127pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
128
129pub(super) const PARQUET_CREATED_BY: &str =
130 concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
131
132fn default_commit_retry_num() -> u32 {
133 8
134}
135
136fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
137 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
138}
139
140fn default_iceberg_write_mode() -> IcebergWriteMode {
141 IcebergWriteMode::MergeOnRead
142}
143
144fn default_iceberg_format_version() -> FormatVersion {
145 FormatVersion::V2
146}
147
148fn default_true() -> bool {
149 true
150}
151
152fn default_some_true() -> Option<bool> {
153 Some(true)
154}
155
156fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
157 let parsed = value
158 .trim()
159 .parse::<u8>()
160 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
161 match parsed {
162 1 => Ok(FormatVersion::V1),
163 2 => Ok(FormatVersion::V2),
164 3 => Ok(FormatVersion::V3),
165 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
166 }
167}
168
169fn deserialize_format_version<'de, D>(
170 deserializer: D,
171) -> std::result::Result<FormatVersion, D::Error>
172where
173 D: Deserializer<'de>,
174{
175 struct FormatVersionVisitor;
176
177 impl<'de> Visitor<'de> for FormatVersionVisitor {
178 type Value = FormatVersion;
179
180 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 formatter.write_str("format-version as 1, 2, or 3")
182 }
183
184 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
185 where
186 E: de::Error,
187 {
188 let value = u8::try_from(value)
189 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
190 parse_format_version_str(&value.to_string()).map_err(E::custom)
191 }
192
193 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
194 where
195 E: de::Error,
196 {
197 let value = u8::try_from(value)
198 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
199 parse_format_version_str(&value.to_string()).map_err(E::custom)
200 }
201
202 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
203 where
204 E: de::Error,
205 {
206 parse_format_version_str(value).map_err(E::custom)
207 }
208
209 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
210 where
211 E: de::Error,
212 {
213 self.visit_str(&value)
214 }
215 }
216
217 deserializer.deserialize_any(FormatVersionVisitor)
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
222#[serde(rename_all = "kebab-case")]
223pub enum CompactionType {
224 #[default]
226 Full,
227 SmallFiles,
229 FilesWithDelete,
231}
232
233impl CompactionType {
234 pub fn as_str(&self) -> &'static str {
235 match self {
236 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
237 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
239 }
240 }
241}
242
243impl std::str::FromStr for CompactionType {
244 type Err = SinkError;
245
246 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
247 match s {
248 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
249 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
250 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
251 _ => Err(SinkError::Config(anyhow!(format!(
252 "invalid compaction_type: {}, must be one of: {}, {}, {}",
253 s,
254 ICEBERG_COMPACTION_TYPE_FULL,
255 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
256 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
257 )))),
258 }
259 }
260}
261
262impl TryFrom<&str> for CompactionType {
263 type Error = <Self as std::str::FromStr>::Err;
264
265 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
266 value.parse()
267 }
268}
269
270impl TryFrom<String> for CompactionType {
271 type Error = <Self as std::str::FromStr>::Err;
272
273 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
274 value.as_str().parse()
275 }
276}
277
278impl std::fmt::Display for CompactionType {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "{}", self.as_str())
281 }
282}
283
284#[serde_as]
285#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
286pub struct IcebergConfig {
287 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
290 pub force_append_only: bool,
291
292 #[serde(flatten)]
293 pub(crate) common: IcebergCommon,
294
295 #[serde(flatten)]
296 pub(crate) table: IcebergTableIdentifier,
297
298 #[serde(
299 rename = "primary_key",
300 default,
301 deserialize_with = "deserialize_optional_string_seq_from_string"
302 )]
303 pub primary_key: Option<Vec<String>>,
304
305 #[serde(skip)]
307 pub java_catalog_props: HashMap<String, String>,
308
309 #[serde(default)]
310 pub partition_by: Option<String>,
311
312 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
314 #[serde_as(as = "DisplayFromStr")]
315 #[with_option(allow_alter_on_fly)]
316 pub commit_checkpoint_interval: u64,
317
318 #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
321 #[serde_as(as = "Option<DisplayFromStr>")]
322 #[with_option(allow_alter_on_fly)]
323 pub commit_checkpoint_size_threshold_mb: Option<u64>,
324
325 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
326 pub create_table_if_not_exists: bool,
327
328 #[serde(default = "default_some_true")]
330 #[serde_as(as = "Option<DisplayFromStr>")]
331 pub is_exactly_once: Option<bool>,
332 #[serde(default = "default_commit_retry_num")]
337 pub commit_retry_num: u32,
338
339 #[serde(
341 rename = "enable_compaction",
342 default,
343 deserialize_with = "deserialize_bool_from_string"
344 )]
345 #[with_option(allow_alter_on_fly)]
346 pub enable_compaction: bool,
347
348 #[serde(rename = "compaction_interval_sec", default)]
350 #[serde_as(as = "Option<DisplayFromStr>")]
351 #[with_option(allow_alter_on_fly)]
352 pub compaction_interval_sec: Option<u64>,
353
354 #[serde(
356 rename = "enable_snapshot_expiration",
357 default,
358 deserialize_with = "deserialize_bool_from_string"
359 )]
360 #[with_option(allow_alter_on_fly)]
361 pub enable_snapshot_expiration: bool,
362
363 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
365 pub write_mode: IcebergWriteMode,
366
367 #[serde(
369 rename = "format_version",
370 default = "default_iceberg_format_version",
371 deserialize_with = "deserialize_format_version"
372 )]
373 pub format_version: FormatVersion,
374
375 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
378 #[serde_as(as = "Option<DisplayFromStr>")]
379 #[with_option(allow_alter_on_fly)]
380 pub snapshot_expiration_max_age_millis: Option<i64>,
381
382 #[serde(rename = "snapshot_expiration_retain_last", default)]
384 #[serde_as(as = "Option<DisplayFromStr>")]
385 #[with_option(allow_alter_on_fly)]
386 pub snapshot_expiration_retain_last: Option<i32>,
387
388 #[serde(
389 rename = "snapshot_expiration_clear_expired_files",
390 default = "default_true",
391 deserialize_with = "deserialize_bool_from_string"
392 )]
393 #[with_option(allow_alter_on_fly)]
394 pub snapshot_expiration_clear_expired_files: bool,
395
396 #[serde(
397 rename = "snapshot_expiration_clear_expired_meta_data",
398 default = "default_true",
399 deserialize_with = "deserialize_bool_from_string"
400 )]
401 #[with_option(allow_alter_on_fly)]
402 pub snapshot_expiration_clear_expired_meta_data: bool,
403
404 #[serde(rename = "compaction.max_snapshots_num", default)]
407 #[serde_as(as = "Option<DisplayFromStr>")]
408 #[with_option(allow_alter_on_fly)]
409 pub max_snapshots_num_before_compaction: Option<usize>,
410
411 #[serde(rename = "compaction.small_files_threshold_mb", default)]
412 #[serde_as(as = "Option<DisplayFromStr>")]
413 #[with_option(allow_alter_on_fly)]
414 pub small_files_threshold_mb: Option<u64>,
415
416 #[serde(rename = "compaction.delete_files_count_threshold", default)]
417 #[serde_as(as = "Option<DisplayFromStr>")]
418 #[with_option(allow_alter_on_fly)]
419 pub delete_files_count_threshold: Option<usize>,
420
421 #[serde(rename = "compaction.trigger_snapshot_count", default)]
422 #[serde_as(as = "Option<DisplayFromStr>")]
423 #[with_option(allow_alter_on_fly)]
424 pub trigger_snapshot_count: Option<usize>,
425
426 #[serde(rename = "compaction.target_file_size_mb", default)]
427 #[serde_as(as = "Option<DisplayFromStr>")]
428 #[with_option(allow_alter_on_fly)]
429 pub target_file_size_mb: Option<u64>,
430
431 #[serde(rename = "compaction.type", default)]
434 #[with_option(allow_alter_on_fly)]
435 pub compaction_type: Option<CompactionType>,
436
437 #[serde(rename = "compaction.write_parquet_compression", default)]
441 #[with_option(allow_alter_on_fly)]
442 pub write_parquet_compression: Option<String>,
443
444 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
447 #[serde_as(as = "Option<DisplayFromStr>")]
448 #[with_option(allow_alter_on_fly)]
449 pub write_parquet_max_row_group_rows: Option<usize>,
450}
451
452impl EnforceSecret for IcebergConfig {
453 fn enforce_secret<'a>(
454 prop_iter: impl Iterator<Item = &'a str>,
455 ) -> crate::error::ConnectorResult<()> {
456 for prop in prop_iter {
457 IcebergCommon::enforce_one(prop)?;
458 }
459 Ok(())
460 }
461
462 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
463 IcebergCommon::enforce_one(prop)
464 }
465}
466
467impl IcebergConfig {
468 pub fn validate_append_only_write_mode(
471 sink_type: &str,
472 write_mode: IcebergWriteMode,
473 ) -> Result<()> {
474 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
475 return Err(SinkError::Config(anyhow!(
476 "'copy-on-write' mode is not supported for append-only iceberg sink. \
477 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
478 )));
479 }
480 Ok(())
481 }
482
483 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
484 let mut config =
485 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
486 .map_err(|e| SinkError::Config(anyhow!(e)))?;
487
488 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
489 return Err(SinkError::Config(anyhow!(
490 "`{}` must be {}, or {}",
491 SINK_TYPE_OPTION,
492 SINK_TYPE_APPEND_ONLY,
493 SINK_TYPE_UPSERT
494 )));
495 }
496
497 if config.r#type == SINK_TYPE_UPSERT {
498 if let Some(primary_key) = &config.primary_key {
499 if primary_key.is_empty() {
500 return Err(SinkError::Config(anyhow!(
501 "`primary-key` must not be empty in {}",
502 SINK_TYPE_UPSERT
503 )));
504 }
505 } else {
506 return Err(SinkError::Config(anyhow!(
507 "Must set `primary-key` in {}",
508 SINK_TYPE_UPSERT
509 )));
510 }
511 }
512
513 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
515
516 config.java_catalog_props = values
518 .iter()
519 .filter(|(k, _v)| {
520 k.starts_with("catalog.")
521 && k != &"catalog.uri"
522 && k != &"catalog.type"
523 && k != &"catalog.name"
524 && k != &"catalog.header"
525 })
526 .map(|(k, v)| (k[8..].to_string(), v.clone()))
527 .collect();
528
529 if config.commit_checkpoint_interval == 0 {
530 return Err(SinkError::Config(anyhow!(
531 "`commit-checkpoint-interval` must be greater than 0"
532 )));
533 }
534
535 if config.commit_checkpoint_size_threshold_mb == Some(0) {
536 return Err(SinkError::Config(anyhow!(
537 "`commit_checkpoint_size_threshold_mb` must be greater than 0"
538 )));
539 }
540
541 config
543 .table
544 .validate()
545 .map_err(|e| SinkError::Config(anyhow!(e)))?;
546
547 Ok(config)
548 }
549
550 pub fn catalog_type(&self) -> &str {
551 self.common.catalog_type()
552 }
553
554 pub async fn load_table(&self) -> Result<Table> {
555 self.common
556 .load_table(&self.table, &self.java_catalog_props)
557 .await
558 .map_err(Into::into)
559 }
560
561 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
562 self.common
563 .create_catalog(&self.java_catalog_props)
564 .await
565 .map_err(Into::into)
566 }
567
568 pub fn full_table_name(&self) -> Result<TableIdent> {
569 self.table.to_table_ident().map_err(Into::into)
570 }
571
572 pub fn catalog_name(&self) -> String {
573 self.common.catalog_name()
574 }
575
576 pub fn table_format_version(&self) -> FormatVersion {
577 self.format_version
578 }
579
580 pub fn compaction_interval_sec(&self) -> u64 {
581 self.compaction_interval_sec.unwrap_or(3600)
583 }
584
585 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
588 self.snapshot_expiration_max_age_millis
589 .map(|max_age_millis| current_time_ms - max_age_millis)
590 }
591
592 pub fn trigger_snapshot_count(&self) -> usize {
593 self.trigger_snapshot_count.unwrap_or(usize::MAX)
594 }
595
596 pub fn small_files_threshold_mb(&self) -> u64 {
597 self.small_files_threshold_mb.unwrap_or(64)
598 }
599
600 pub fn delete_files_count_threshold(&self) -> usize {
601 self.delete_files_count_threshold.unwrap_or(256)
602 }
603
604 pub fn target_file_size_mb(&self) -> u64 {
605 self.target_file_size_mb.unwrap_or(1024)
606 }
607
608 pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
609 self.commit_checkpoint_size_threshold_mb
610 .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
611 }
612
613 pub fn compaction_type(&self) -> CompactionType {
616 self.compaction_type.unwrap_or_default()
617 }
618
619 pub fn write_parquet_compression(&self) -> &str {
622 self.write_parquet_compression.as_deref().unwrap_or("zstd")
623 }
624
625 pub fn write_parquet_max_row_group_rows(&self) -> usize {
628 self.write_parquet_max_row_group_rows.unwrap_or(122880)
629 }
630
631 pub fn get_parquet_compression(&self) -> Compression {
634 parse_parquet_compression(self.write_parquet_compression())
635 }
636}
637
638pub fn parse_parquet_compression(codec: &str) -> Compression {
640 match codec.to_lowercase().as_str() {
641 "uncompressed" => Compression::UNCOMPRESSED,
642 "snappy" => Compression::SNAPPY,
643 "gzip" => Compression::GZIP(Default::default()),
644 "lzo" => Compression::LZO,
645 "brotli" => Compression::BROTLI(Default::default()),
646 "lz4" => Compression::LZ4,
647 "zstd" => Compression::ZSTD(Default::default()),
648 _ => {
649 tracing::warn!(
650 "Unknown compression codec '{}', falling back to SNAPPY",
651 codec
652 );
653 Compression::SNAPPY
654 }
655 }
656}
657
658pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
661 if should_enable_iceberg_cow(sink_type, write_mode) {
662 ICEBERG_COW_BRANCH.to_owned()
663 } else {
664 MAIN_BRANCH.to_owned()
665 }
666}
667
668pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
669 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
670}
671
672impl crate::with_options::WithOptions for IcebergWriteMode {}
673
674impl crate::with_options::WithOptions for FormatVersion {}
675
676impl crate::with_options::WithOptions for CompactionType {}