1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31 SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32 UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38 DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45 PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50 DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::file::properties::WriterProperties;
58use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
59use regex::Regex;
60use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
61use risingwave_common::array::arrow::arrow_schema_iceberg::{
62 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
63 Schema as ArrowSchema, SchemaRef,
64};
65use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
66use risingwave_common::array::{Op, StreamChunk};
67use risingwave_common::bail;
68use risingwave_common::bitmap::Bitmap;
69use risingwave_common::catalog::{Field, Schema};
70use risingwave_common::error::IcebergError;
71use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
72use risingwave_common_estimate_size::EstimateSize;
73use risingwave_pb::connector_service::SinkMetadata;
74use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
75use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
76use risingwave_pb::stream_plan::PbSinkSchemaChange;
77use serde::de::{self, Deserializer, Visitor};
78use serde::{Deserialize, Serialize};
79use serde_json::from_value;
80use serde_with::{DisplayFromStr, serde_as};
81use thiserror_ext::AsReport;
82use tokio::sync::mpsc::UnboundedSender;
83use tokio_retry::RetryIf;
84use tokio_retry::strategy::{ExponentialBackoff, jitter};
85use tracing::warn;
86use url::Url;
87use uuid::Uuid;
88use with_options::WithOptions;
89
90use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
91use super::{
92 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
93 SinkError, SinkWriterParam,
94};
95use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
96use crate::enforce_secret::EnforceSecret;
97use crate::sink::catalog::SinkId;
98use crate::sink::coordinate::CoordinatedLogSinker;
99use crate::sink::writer::SinkWriter;
100use crate::sink::{
101 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
102 TwoPhaseCommitCoordinator,
103};
104use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
105
106pub const ICEBERG_SINK: &str = "iceberg";
107
108pub const ICEBERG_COW_BRANCH: &str = "ingestion";
109pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
110pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
111pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
112pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
113pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
114
115pub const PARTITION_DATA_ID_START: i32 = 1000;
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
118#[serde(rename_all = "kebab-case")]
119pub enum IcebergWriteMode {
120 #[default]
121 MergeOnRead,
122 CopyOnWrite,
123}
124
125impl IcebergWriteMode {
126 pub fn as_str(self) -> &'static str {
127 match self {
128 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
129 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
130 }
131 }
132}
133
134impl std::str::FromStr for IcebergWriteMode {
135 type Err = SinkError;
136
137 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
138 match s {
139 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
140 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
141 _ => Err(SinkError::Config(anyhow!(format!(
142 "invalid write_mode: {}, must be one of: {}, {}",
143 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
144 )))),
145 }
146 }
147}
148
149impl TryFrom<&str> for IcebergWriteMode {
150 type Error = <Self as std::str::FromStr>::Err;
151
152 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
153 value.parse()
154 }
155}
156
157impl TryFrom<String> for IcebergWriteMode {
158 type Error = <Self as std::str::FromStr>::Err;
159
160 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
161 value.as_str().parse()
162 }
163}
164
165impl std::fmt::Display for IcebergWriteMode {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 f.write_str(self.as_str())
168 }
169}
170
171pub const ENABLE_COMPACTION: &str = "enable_compaction";
173pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
174pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
175pub const WRITE_MODE: &str = "write_mode";
176pub const FORMAT_VERSION: &str = "format_version";
177pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
178pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
179pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
181 "snapshot_expiration_clear_expired_meta_data";
182pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
183
184pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
185
186pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
187
188pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
189
190pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
191
192pub const COMPACTION_TYPE: &str = "compaction.type";
193
194fn default_commit_retry_num() -> u32 {
195 8
196}
197
198fn default_iceberg_write_mode() -> IcebergWriteMode {
199 IcebergWriteMode::MergeOnRead
200}
201
202fn default_iceberg_format_version() -> FormatVersion {
203 FormatVersion::V2
204}
205
206fn default_true() -> bool {
207 true
208}
209
210fn default_some_true() -> Option<bool> {
211 Some(true)
212}
213
214fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
215 let parsed = value
216 .trim()
217 .parse::<u8>()
218 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
219 match parsed {
220 1 => Ok(FormatVersion::V1),
221 2 => Ok(FormatVersion::V2),
222 3 => Ok(FormatVersion::V3),
223 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
224 }
225}
226
227fn deserialize_format_version<'de, D>(
228 deserializer: D,
229) -> std::result::Result<FormatVersion, D::Error>
230where
231 D: Deserializer<'de>,
232{
233 struct FormatVersionVisitor;
234
235 impl<'de> Visitor<'de> for FormatVersionVisitor {
236 type Value = FormatVersion;
237
238 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 formatter.write_str("format-version as 1, 2, or 3")
240 }
241
242 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
243 where
244 E: de::Error,
245 {
246 let value = u8::try_from(value)
247 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
248 parse_format_version_str(&value.to_string()).map_err(E::custom)
249 }
250
251 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
252 where
253 E: de::Error,
254 {
255 let value = u8::try_from(value)
256 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
257 parse_format_version_str(&value.to_string()).map_err(E::custom)
258 }
259
260 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
261 where
262 E: de::Error,
263 {
264 parse_format_version_str(value).map_err(E::custom)
265 }
266
267 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
268 where
269 E: de::Error,
270 {
271 self.visit_str(&value)
272 }
273 }
274
275 deserializer.deserialize_any(FormatVersionVisitor)
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
280#[serde(rename_all = "kebab-case")]
281pub enum CompactionType {
282 #[default]
284 Full,
285 SmallFiles,
287 FilesWithDelete,
289}
290
291impl CompactionType {
292 pub fn as_str(&self) -> &'static str {
293 match self {
294 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
295 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
296 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
297 }
298 }
299}
300
301impl std::str::FromStr for CompactionType {
302 type Err = SinkError;
303
304 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
305 match s {
306 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
307 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
308 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
309 _ => Err(SinkError::Config(anyhow!(format!(
310 "invalid compaction_type: {}, must be one of: {}, {}, {}",
311 s,
312 ICEBERG_COMPACTION_TYPE_FULL,
313 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
314 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
315 )))),
316 }
317 }
318}
319
320impl TryFrom<&str> for CompactionType {
321 type Error = <Self as std::str::FromStr>::Err;
322
323 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
324 value.parse()
325 }
326}
327
328impl TryFrom<String> for CompactionType {
329 type Error = <Self as std::str::FromStr>::Err;
330
331 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
332 value.as_str().parse()
333 }
334}
335
336impl std::fmt::Display for CompactionType {
337 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 write!(f, "{}", self.as_str())
339 }
340}
341
342#[serde_as]
343#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
344pub struct IcebergConfig {
345 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
348 pub force_append_only: bool,
349
350 #[serde(flatten)]
351 common: IcebergCommon,
352
353 #[serde(flatten)]
354 table: IcebergTableIdentifier,
355
356 #[serde(
357 rename = "primary_key",
358 default,
359 deserialize_with = "deserialize_optional_string_seq_from_string"
360 )]
361 pub primary_key: Option<Vec<String>>,
362
363 #[serde(skip)]
365 pub java_catalog_props: HashMap<String, String>,
366
367 #[serde(default)]
368 pub partition_by: Option<String>,
369
370 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
372 #[serde_as(as = "DisplayFromStr")]
373 #[with_option(allow_alter_on_fly)]
374 pub commit_checkpoint_interval: u64,
375
376 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
377 pub create_table_if_not_exists: bool,
378
379 #[serde(default = "default_some_true")]
381 #[serde_as(as = "Option<DisplayFromStr>")]
382 pub is_exactly_once: Option<bool>,
383 #[serde(default = "default_commit_retry_num")]
388 pub commit_retry_num: u32,
389
390 #[serde(
392 rename = "enable_compaction",
393 default,
394 deserialize_with = "deserialize_bool_from_string"
395 )]
396 #[with_option(allow_alter_on_fly)]
397 pub enable_compaction: bool,
398
399 #[serde(rename = "compaction_interval_sec", default)]
401 #[serde_as(as = "Option<DisplayFromStr>")]
402 #[with_option(allow_alter_on_fly)]
403 pub compaction_interval_sec: Option<u64>,
404
405 #[serde(
407 rename = "enable_snapshot_expiration",
408 default,
409 deserialize_with = "deserialize_bool_from_string"
410 )]
411 #[with_option(allow_alter_on_fly)]
412 pub enable_snapshot_expiration: bool,
413
414 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
416 pub write_mode: IcebergWriteMode,
417
418 #[serde(
420 rename = "format_version",
421 default = "default_iceberg_format_version",
422 deserialize_with = "deserialize_format_version"
423 )]
424 pub format_version: FormatVersion,
425
426 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
429 #[serde_as(as = "Option<DisplayFromStr>")]
430 #[with_option(allow_alter_on_fly)]
431 pub snapshot_expiration_max_age_millis: Option<i64>,
432
433 #[serde(rename = "snapshot_expiration_retain_last", default)]
435 #[serde_as(as = "Option<DisplayFromStr>")]
436 #[with_option(allow_alter_on_fly)]
437 pub snapshot_expiration_retain_last: Option<i32>,
438
439 #[serde(
440 rename = "snapshot_expiration_clear_expired_files",
441 default = "default_true",
442 deserialize_with = "deserialize_bool_from_string"
443 )]
444 #[with_option(allow_alter_on_fly)]
445 pub snapshot_expiration_clear_expired_files: bool,
446
447 #[serde(
448 rename = "snapshot_expiration_clear_expired_meta_data",
449 default = "default_true",
450 deserialize_with = "deserialize_bool_from_string"
451 )]
452 #[with_option(allow_alter_on_fly)]
453 pub snapshot_expiration_clear_expired_meta_data: bool,
454
455 #[serde(rename = "compaction.max_snapshots_num", default)]
458 #[serde_as(as = "Option<DisplayFromStr>")]
459 #[with_option(allow_alter_on_fly)]
460 pub max_snapshots_num_before_compaction: Option<usize>,
461
462 #[serde(rename = "compaction.small_files_threshold_mb", default)]
463 #[serde_as(as = "Option<DisplayFromStr>")]
464 #[with_option(allow_alter_on_fly)]
465 pub small_files_threshold_mb: Option<u64>,
466
467 #[serde(rename = "compaction.delete_files_count_threshold", default)]
468 #[serde_as(as = "Option<DisplayFromStr>")]
469 #[with_option(allow_alter_on_fly)]
470 pub delete_files_count_threshold: Option<usize>,
471
472 #[serde(rename = "compaction.trigger_snapshot_count", default)]
473 #[serde_as(as = "Option<DisplayFromStr>")]
474 #[with_option(allow_alter_on_fly)]
475 pub trigger_snapshot_count: Option<usize>,
476
477 #[serde(rename = "compaction.target_file_size_mb", default)]
478 #[serde_as(as = "Option<DisplayFromStr>")]
479 #[with_option(allow_alter_on_fly)]
480 pub target_file_size_mb: Option<u64>,
481
482 #[serde(rename = "compaction.type", default)]
485 #[with_option(allow_alter_on_fly)]
486 pub compaction_type: Option<CompactionType>,
487}
488
489impl EnforceSecret for IcebergConfig {
490 fn enforce_secret<'a>(
491 prop_iter: impl Iterator<Item = &'a str>,
492 ) -> crate::error::ConnectorResult<()> {
493 for prop in prop_iter {
494 IcebergCommon::enforce_one(prop)?;
495 }
496 Ok(())
497 }
498
499 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
500 IcebergCommon::enforce_one(prop)
501 }
502}
503
504impl IcebergConfig {
505 fn validate_append_only_write_mode(
508 sink_type: &str,
509 write_mode: IcebergWriteMode,
510 ) -> Result<()> {
511 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
512 return Err(SinkError::Config(anyhow!(
513 "'copy-on-write' mode is not supported for append-only iceberg sink. \
514 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
515 )));
516 }
517 Ok(())
518 }
519
520 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
521 let mut config =
522 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
523 .map_err(|e| SinkError::Config(anyhow!(e)))?;
524
525 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
526 return Err(SinkError::Config(anyhow!(
527 "`{}` must be {}, or {}",
528 SINK_TYPE_OPTION,
529 SINK_TYPE_APPEND_ONLY,
530 SINK_TYPE_UPSERT
531 )));
532 }
533
534 if config.r#type == SINK_TYPE_UPSERT {
535 if let Some(primary_key) = &config.primary_key {
536 if primary_key.is_empty() {
537 return Err(SinkError::Config(anyhow!(
538 "`primary-key` must not be empty in {}",
539 SINK_TYPE_UPSERT
540 )));
541 }
542 } else {
543 return Err(SinkError::Config(anyhow!(
544 "Must set `primary-key` in {}",
545 SINK_TYPE_UPSERT
546 )));
547 }
548 }
549
550 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
552
553 config.java_catalog_props = values
555 .iter()
556 .filter(|(k, _v)| {
557 k.starts_with("catalog.")
558 && k != &"catalog.uri"
559 && k != &"catalog.type"
560 && k != &"catalog.name"
561 && k != &"catalog.header"
562 })
563 .map(|(k, v)| (k[8..].to_string(), v.clone()))
564 .collect();
565
566 if config.commit_checkpoint_interval == 0 {
567 return Err(SinkError::Config(anyhow!(
568 "`commit-checkpoint-interval` must be greater than 0"
569 )));
570 }
571
572 Ok(config)
573 }
574
575 pub fn catalog_type(&self) -> &str {
576 self.common.catalog_type()
577 }
578
579 pub async fn load_table(&self) -> Result<Table> {
580 self.common
581 .load_table(&self.table, &self.java_catalog_props)
582 .await
583 .map_err(Into::into)
584 }
585
586 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
587 self.common
588 .create_catalog(&self.java_catalog_props)
589 .await
590 .map_err(Into::into)
591 }
592
593 pub fn full_table_name(&self) -> Result<TableIdent> {
594 self.table.to_table_ident().map_err(Into::into)
595 }
596
597 pub fn catalog_name(&self) -> String {
598 self.common.catalog_name()
599 }
600
601 pub fn table_format_version(&self) -> FormatVersion {
602 self.format_version
603 }
604
605 pub fn compaction_interval_sec(&self) -> u64 {
606 self.compaction_interval_sec.unwrap_or(3600)
608 }
609
610 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
613 self.snapshot_expiration_max_age_millis
614 .map(|max_age_millis| current_time_ms - max_age_millis)
615 }
616
617 pub fn trigger_snapshot_count(&self) -> usize {
618 self.trigger_snapshot_count.unwrap_or(usize::MAX)
619 }
620
621 pub fn small_files_threshold_mb(&self) -> u64 {
622 self.small_files_threshold_mb.unwrap_or(64)
623 }
624
625 pub fn delete_files_count_threshold(&self) -> usize {
626 self.delete_files_count_threshold.unwrap_or(256)
627 }
628
629 pub fn target_file_size_mb(&self) -> u64 {
630 self.target_file_size_mb.unwrap_or(1024)
631 }
632
633 pub fn compaction_type(&self) -> CompactionType {
636 self.compaction_type.unwrap_or_default()
637 }
638}
639
640pub struct IcebergSink {
641 pub config: IcebergConfig,
642 param: SinkParam,
643 unique_column_ids: Option<Vec<usize>>,
645}
646
647impl EnforceSecret for IcebergSink {
648 fn enforce_secret<'a>(
649 prop_iter: impl Iterator<Item = &'a str>,
650 ) -> crate::error::ConnectorResult<()> {
651 for prop in prop_iter {
652 IcebergConfig::enforce_one(prop)?;
653 }
654 Ok(())
655 }
656}
657
658impl TryFrom<SinkParam> for IcebergSink {
659 type Error = SinkError;
660
661 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
662 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
663 IcebergSink::new(config, param)
664 }
665}
666
667impl Debug for IcebergSink {
668 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669 f.debug_struct("IcebergSink")
670 .field("config", &self.config)
671 .finish()
672 }
673}
674
675async fn create_and_validate_table_impl(
676 config: &IcebergConfig,
677 param: &SinkParam,
678) -> Result<Table> {
679 if config.create_table_if_not_exists {
680 create_table_if_not_exists_impl(config, param).await?;
681 }
682
683 let table = config
684 .load_table()
685 .await
686 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
687
688 let sink_schema = param.schema();
689 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
690 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
691
692 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
693 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
694
695 Ok(table)
696}
697
698async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
699 let catalog = config.create_catalog().await?;
700 let namespace = if let Some(database_name) = config.table.database_name() {
701 let namespace = NamespaceIdent::new(database_name.to_owned());
702 if !catalog
703 .namespace_exists(&namespace)
704 .await
705 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
706 {
707 catalog
708 .create_namespace(&namespace, HashMap::default())
709 .await
710 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
711 .context("failed to create iceberg namespace")?;
712 }
713 namespace
714 } else {
715 bail!("database name must be set if you want to create table")
716 };
717
718 let table_id = config
719 .full_table_name()
720 .context("Unable to parse table name")?;
721 if !catalog
722 .table_exists(&table_id)
723 .await
724 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
725 {
726 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
727 let arrow_fields = param
729 .columns
730 .iter()
731 .map(|column| {
732 Ok(iceberg_create_table_arrow_convert
733 .to_arrow_field(&column.name, &column.data_type)
734 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
735 .context(format!(
736 "failed to convert {}: {} to arrow type",
737 &column.name, &column.data_type
738 ))?)
739 })
740 .collect::<Result<Vec<ArrowField>>>()?;
741 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
742 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
743 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
744 .context("failed to convert arrow schema to iceberg schema")?;
745
746 let location = {
747 let mut names = namespace.clone().inner();
748 names.push(config.table.table_name().to_owned());
749 match &config.common.warehouse_path {
750 Some(warehouse_path) => {
751 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
752 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
754 let url = Url::parse(warehouse_path);
755 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
756 if config.common.catalog_type() == "rest"
759 || config.common.catalog_type() == "rest_rust"
760 {
761 None
762 } else {
763 bail!(format!("Invalid warehouse path: {}", warehouse_path))
764 }
765 } else if warehouse_path.ends_with('/') {
766 Some(format!("{}{}", warehouse_path, names.join("/")))
767 } else {
768 Some(format!("{}/{}", warehouse_path, names.join("/")))
769 }
770 }
771 None => None,
772 }
773 };
774
775 let partition_spec = match &config.partition_by {
776 Some(partition_by) => {
777 let mut partition_fields = Vec::<UnboundPartitionField>::new();
778 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
779 .into_iter()
780 .enumerate()
781 {
782 match iceberg_schema.field_id_by_name(&column) {
783 Some(id) => partition_fields.push(
784 UnboundPartitionField::builder()
785 .source_id(id)
786 .transform(transform)
787 .name(format!("_p_{}", column))
788 .field_id(PARTITION_DATA_ID_START + i as i32)
789 .build(),
790 ),
791 None => bail!(format!(
792 "Partition source column does not exist in schema: {}",
793 column
794 )),
795 };
796 }
797 Some(
798 UnboundPartitionSpec::builder()
799 .with_spec_id(0)
800 .add_partition_fields(partition_fields)
801 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
802 .context("failed to add partition columns")?
803 .build(),
804 )
805 }
806 None => None,
807 };
808
809 let properties = HashMap::from([(
811 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
812 (config.format_version as u8).to_string(),
813 )]);
814
815 let table_creation_builder = TableCreation::builder()
816 .name(config.table.table_name().to_owned())
817 .schema(iceberg_schema)
818 .format_version(config.table_format_version())
819 .properties(properties);
820
821 let table_creation = match (location, partition_spec) {
822 (Some(location), Some(partition_spec)) => table_creation_builder
823 .location(location)
824 .partition_spec(partition_spec)
825 .build(),
826 (Some(location), None) => table_creation_builder.location(location).build(),
827 (None, Some(partition_spec)) => table_creation_builder
828 .partition_spec(partition_spec)
829 .build(),
830 (None, None) => table_creation_builder.build(),
831 };
832
833 catalog
834 .create_table(&namespace, table_creation)
835 .await
836 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
837 .context("failed to create iceberg table")?;
838 }
839 Ok(())
840}
841
842impl IcebergSink {
843 pub async fn create_and_validate_table(&self) -> Result<Table> {
844 create_and_validate_table_impl(&self.config, &self.param).await
845 }
846
847 pub async fn create_table_if_not_exists(&self) -> Result<()> {
848 create_table_if_not_exists_impl(&self.config, &self.param).await
849 }
850
851 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
852 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
853 if let Some(pk) = &config.primary_key {
854 let mut unique_column_ids = Vec::with_capacity(pk.len());
855 for col_name in pk {
856 let id = param
857 .columns
858 .iter()
859 .find(|col| col.name.as_str() == col_name)
860 .ok_or_else(|| {
861 SinkError::Config(anyhow!(
862 "Primary key column {} not found in sink schema",
863 col_name
864 ))
865 })?
866 .column_id
867 .get_id() as usize;
868 unique_column_ids.push(id);
869 }
870 Some(unique_column_ids)
871 } else {
872 unreachable!()
873 }
874 } else {
875 None
876 };
877 Ok(Self {
878 config,
879 param,
880 unique_column_ids,
881 })
882 }
883}
884
885impl Sink for IcebergSink {
886 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
887
888 const SINK_NAME: &'static str = ICEBERG_SINK;
889
890 async fn validate(&self) -> Result<()> {
891 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
892 bail!("Snowflake catalog only supports iceberg sources");
893 }
894
895 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
896 risingwave_common::license::Feature::IcebergSinkWithGlue
897 .check_available()
898 .map_err(|e| anyhow::anyhow!(e))?;
899 }
900
901 IcebergConfig::validate_append_only_write_mode(
903 &self.config.r#type,
904 self.config.write_mode,
905 )?;
906
907 let compaction_type = self.config.compaction_type();
909
910 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
913 && compaction_type != CompactionType::Full
914 {
915 bail!(
916 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
917 compaction_type
918 );
919 }
920
921 match compaction_type {
922 CompactionType::SmallFiles => {
923 risingwave_common::license::Feature::IcebergCompaction
925 .check_available()
926 .map_err(|e| anyhow::anyhow!(e))?;
927
928 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
930 bail!(
931 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
932 self.config.write_mode
933 );
934 }
935
936 if self.config.delete_files_count_threshold.is_some() {
938 bail!(
939 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
940 );
941 }
942 }
943 CompactionType::FilesWithDelete => {
944 risingwave_common::license::Feature::IcebergCompaction
946 .check_available()
947 .map_err(|e| anyhow::anyhow!(e))?;
948
949 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
951 bail!(
952 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
953 self.config.write_mode
954 );
955 }
956
957 if self.config.small_files_threshold_mb.is_some() {
959 bail!(
960 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
961 );
962 }
963 }
964 CompactionType::Full => {
965 }
967 }
968
969 let _ = self.create_and_validate_table().await?;
970 Ok(())
971 }
972
973 fn support_schema_change() -> bool {
974 true
975 }
976
977 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
978 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
979
980 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
982 if iceberg_config.enable_compaction && compaction_interval == 0 {
983 bail!(
984 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
985 );
986 }
987
988 tracing::info!(
990 "Alter config compaction_interval set to {} seconds",
991 compaction_interval
992 );
993 }
994
995 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
997 && max_snapshots < 1
998 {
999 bail!(
1000 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1001 max_snapshots
1002 );
1003 }
1004
1005 Ok(())
1006 }
1007
1008 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1009 let writer = IcebergSinkWriter::new(
1010 self.config.clone(),
1011 self.param.clone(),
1012 writer_param.clone(),
1013 self.unique_column_ids.clone(),
1014 );
1015
1016 let commit_checkpoint_interval =
1017 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1018 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1019 );
1020 let log_sinker = CoordinatedLogSinker::new(
1021 &writer_param,
1022 self.param.clone(),
1023 writer,
1024 commit_checkpoint_interval,
1025 )
1026 .await?;
1027
1028 Ok(log_sinker)
1029 }
1030
1031 fn is_coordinated_sink(&self) -> bool {
1032 true
1033 }
1034
1035 async fn new_coordinator(
1036 &self,
1037 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1038 ) -> Result<SinkCommitCoordinator> {
1039 let catalog = self.config.create_catalog().await?;
1040 let table = self.create_and_validate_table().await?;
1041 let coordinator = IcebergSinkCommitter {
1042 catalog,
1043 table,
1044 last_commit_epoch: 0,
1045 sink_id: self.param.sink_id,
1046 config: self.config.clone(),
1047 param: self.param.clone(),
1048 commit_retry_num: self.config.commit_retry_num,
1049 iceberg_compact_stat_sender,
1050 };
1051 if self.config.is_exactly_once.unwrap_or_default() {
1052 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1053 } else {
1054 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1055 }
1056 }
1057}
1058
1059enum ProjectIdxVec {
1066 None,
1067 Prepare(usize),
1068 Done(Vec<usize>),
1069}
1070
1071type DataFileWriterBuilderType =
1072 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1073type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1074 ParquetWriterBuilder,
1075 DefaultLocationGenerator,
1076 DefaultFileNameGenerator,
1077>;
1078type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1079 ParquetWriterBuilder,
1080 DefaultLocationGenerator,
1081 DefaultFileNameGenerator,
1082>;
1083type DeletionVectorWriterBuilderType =
1084 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1085type DeletionVectorWriterType =
1086 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1087type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1088 ParquetWriterBuilder,
1089 DefaultLocationGenerator,
1090 DefaultFileNameGenerator,
1091>;
1092
1093#[derive(Clone)]
1094enum PositionDeleteWriterBuilderType {
1095 PositionDelete(PositionDeleteFileWriterBuilderType),
1096 DeletionVector(DeletionVectorWriterBuilderType),
1097}
1098
1099enum PositionDeleteWriterType {
1100 PositionDelete(PositionDeleteFileWriterType),
1101 DeletionVector(DeletionVectorWriterType),
1102}
1103
1104#[async_trait]
1105impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1106 type R = PositionDeleteWriterType;
1107
1108 async fn build(
1109 self,
1110 partition_key: Option<iceberg::spec::PartitionKey>,
1111 ) -> iceberg::Result<Self::R> {
1112 match self {
1113 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1114 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1115 ),
1116 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1117 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1118 ),
1119 }
1120 }
1121}
1122
1123#[async_trait]
1124impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1125 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1126 match self {
1127 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1128 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1129 }
1130 }
1131
1132 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1133 match self {
1134 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1135 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1136 }
1137 }
1138}
1139
1140#[derive(Clone)]
1141struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1142 inner: B,
1143 fanout_enabled: bool,
1144 schema: IcebergSchemaRef,
1145 partition_spec: PartitionSpecRef,
1146 compute_partition: bool,
1147}
1148
1149impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1150 fn new(
1151 inner: B,
1152 fanout_enabled: bool,
1153 schema: IcebergSchemaRef,
1154 partition_spec: PartitionSpecRef,
1155 compute_partition: bool,
1156 ) -> Self {
1157 Self {
1158 inner,
1159 fanout_enabled,
1160 schema,
1161 partition_spec,
1162 compute_partition,
1163 }
1164 }
1165
1166 fn build(self) -> iceberg::Result<TaskWriter<B>> {
1167 let partition_splitter = match (
1168 self.partition_spec.is_unpartitioned(),
1169 self.compute_partition,
1170 ) {
1171 (true, _) => None,
1172 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
1173 self.schema.clone(),
1174 self.partition_spec.clone(),
1175 )?),
1176 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1177 self.schema.clone(),
1178 self.partition_spec.clone(),
1179 )?),
1180 };
1181
1182 Ok(TaskWriter::new_with_partition_splitter(
1183 self.inner,
1184 self.fanout_enabled,
1185 self.schema,
1186 self.partition_spec,
1187 partition_splitter,
1188 ))
1189 }
1190}
1191
1192pub enum IcebergSinkWriter {
1193 Created(IcebergSinkWriterArgs),
1194 Initialized(IcebergSinkWriterInner),
1195}
1196
1197pub struct IcebergSinkWriterArgs {
1198 config: IcebergConfig,
1199 sink_param: SinkParam,
1200 writer_param: SinkWriterParam,
1201 unique_column_ids: Option<Vec<usize>>,
1202}
1203
1204pub struct IcebergSinkWriterInner {
1205 writer: IcebergWriterDispatch,
1206 arrow_schema: SchemaRef,
1207 metrics: IcebergWriterMetrics,
1209 table: Table,
1211 project_idx_vec: ProjectIdxVec,
1214}
1215
1216#[allow(clippy::type_complexity)]
1217enum IcebergWriterDispatch {
1218 Append {
1219 writer: Option<Box<dyn IcebergWriter>>,
1220 writer_builder:
1221 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1222 },
1223 Upsert {
1224 writer: Option<Box<dyn IcebergWriter>>,
1225 writer_builder: TaskWriterBuilderWrapper<
1226 MonitoredGeneralWriterBuilder<
1227 DeltaWriterBuilder<
1228 DataFileWriterBuilderType,
1229 PositionDeleteWriterBuilderType,
1230 EqualityDeleteFileWriterBuilderType,
1231 >,
1232 >,
1233 >,
1234 arrow_schema_with_op_column: SchemaRef,
1235 },
1236}
1237
1238impl IcebergWriterDispatch {
1239 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1240 match self {
1241 IcebergWriterDispatch::Append { writer, .. }
1242 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1243 }
1244 }
1245}
1246
1247pub struct IcebergWriterMetrics {
1248 _write_qps: LabelGuardedIntCounter,
1253 _write_latency: LabelGuardedHistogram,
1254 write_bytes: LabelGuardedIntCounter,
1255}
1256
1257impl IcebergSinkWriter {
1258 pub fn new(
1259 config: IcebergConfig,
1260 sink_param: SinkParam,
1261 writer_param: SinkWriterParam,
1262 unique_column_ids: Option<Vec<usize>>,
1263 ) -> Self {
1264 Self::Created(IcebergSinkWriterArgs {
1265 config,
1266 sink_param,
1267 writer_param,
1268 unique_column_ids,
1269 })
1270 }
1271}
1272
1273impl IcebergSinkWriterInner {
1274 fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1275 let SinkWriterParam {
1276 extra_partition_col_idx,
1277 actor_id,
1278 sink_id,
1279 sink_name,
1280 ..
1281 } = writer_param;
1282 let metrics_labels = [
1283 &actor_id.to_string(),
1284 &sink_id.to_string(),
1285 sink_name.as_str(),
1286 ];
1287
1288 let write_qps = GLOBAL_SINK_METRICS
1290 .iceberg_write_qps
1291 .with_guarded_label_values(&metrics_labels);
1292 let write_latency = GLOBAL_SINK_METRICS
1293 .iceberg_write_latency
1294 .with_guarded_label_values(&metrics_labels);
1295 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1298 .iceberg_rolling_unflushed_data_file
1299 .with_guarded_label_values(&metrics_labels);
1300 let write_bytes = GLOBAL_SINK_METRICS
1301 .iceberg_write_bytes
1302 .with_guarded_label_values(&metrics_labels);
1303
1304 let schema = table.metadata().current_schema();
1305 let partition_spec = table.metadata().default_partition_spec();
1306 let fanout_enabled = !partition_spec.fields().is_empty();
1307 let unique_uuid_suffix = Uuid::now_v7();
1309
1310 let parquet_writer_properties = WriterProperties::builder()
1311 .set_max_row_group_size(
1312 writer_param
1313 .streaming_config
1314 .developer
1315 .iceberg_sink_write_parquet_max_row_group_rows,
1316 )
1317 .build();
1318
1319 let parquet_writer_builder =
1320 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1321 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1322 parquet_writer_builder,
1323 table.file_io().clone(),
1324 DefaultLocationGenerator::new(table.metadata().clone())
1325 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1326 DefaultFileNameGenerator::new(
1327 writer_param.actor_id.to_string(),
1328 Some(unique_uuid_suffix.to_string()),
1329 iceberg::spec::DataFileFormat::Parquet,
1330 ),
1331 );
1332 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1333 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1334 data_file_builder,
1335 write_qps.clone(),
1336 write_latency.clone(),
1337 );
1338 let writer_builder = TaskWriterBuilderWrapper::new(
1339 monitored_builder,
1340 fanout_enabled,
1341 schema.clone(),
1342 partition_spec.clone(),
1343 true,
1344 );
1345 let inner_writer = Some(Box::new(
1346 writer_builder
1347 .clone()
1348 .build()
1349 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1350 ) as Box<dyn IcebergWriter>);
1351 Ok(Self {
1352 arrow_schema: Arc::new(
1353 schema_to_arrow_schema(table.metadata().current_schema())
1354 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1355 ),
1356 metrics: IcebergWriterMetrics {
1357 _write_qps: write_qps,
1358 _write_latency: write_latency,
1359 write_bytes,
1360 },
1361 writer: IcebergWriterDispatch::Append {
1362 writer: inner_writer,
1363 writer_builder,
1364 },
1365 table,
1366 project_idx_vec: {
1367 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1368 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1369 } else {
1370 ProjectIdxVec::None
1371 }
1372 },
1373 })
1374 }
1375
1376 fn build_upsert(
1377 table: Table,
1378 unique_column_ids: Vec<usize>,
1379 writer_param: &SinkWriterParam,
1380 ) -> Result<Self> {
1381 let SinkWriterParam {
1382 extra_partition_col_idx,
1383 actor_id,
1384 sink_id,
1385 sink_name,
1386 ..
1387 } = writer_param;
1388 let metrics_labels = [
1389 &actor_id.to_string(),
1390 &sink_id.to_string(),
1391 sink_name.as_str(),
1392 ];
1393 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1394
1395 let write_qps = GLOBAL_SINK_METRICS
1397 .iceberg_write_qps
1398 .with_guarded_label_values(&metrics_labels);
1399 let write_latency = GLOBAL_SINK_METRICS
1400 .iceberg_write_latency
1401 .with_guarded_label_values(&metrics_labels);
1402 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1405 .iceberg_rolling_unflushed_data_file
1406 .with_guarded_label_values(&metrics_labels);
1407 let write_bytes = GLOBAL_SINK_METRICS
1408 .iceberg_write_bytes
1409 .with_guarded_label_values(&metrics_labels);
1410
1411 let schema = table.metadata().current_schema();
1413 let partition_spec = table.metadata().default_partition_spec();
1414 let fanout_enabled = !partition_spec.fields().is_empty();
1415 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1416
1417 let unique_uuid_suffix = Uuid::now_v7();
1419
1420 let parquet_writer_properties = WriterProperties::builder()
1421 .set_max_row_group_size(
1422 writer_param
1423 .streaming_config
1424 .developer
1425 .iceberg_sink_write_parquet_max_row_group_rows,
1426 )
1427 .build();
1428
1429 let data_file_builder = {
1430 let parquet_writer_builder =
1431 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1432 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1433 parquet_writer_builder,
1434 table.file_io().clone(),
1435 DefaultLocationGenerator::new(table.metadata().clone())
1436 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1437 DefaultFileNameGenerator::new(
1438 writer_param.actor_id.to_string(),
1439 Some(unique_uuid_suffix.to_string()),
1440 iceberg::spec::DataFileFormat::Parquet,
1441 ),
1442 );
1443 DataFileWriterBuilder::new(rolling_writer_builder)
1444 };
1445 let position_delete_builder = if use_deletion_vectors {
1446 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1447 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1448 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1449 table.file_io().clone(),
1450 location_generator,
1451 DefaultFileNameGenerator::new(
1452 writer_param.actor_id.to_string(),
1453 Some(format!("delvec-{}", unique_uuid_suffix)),
1454 iceberg::spec::DataFileFormat::Puffin,
1455 ),
1456 ))
1457 } else {
1458 let parquet_writer_builder = ParquetWriterBuilder::new(
1459 parquet_writer_properties.clone(),
1460 POSITION_DELETE_SCHEMA.clone().into(),
1461 );
1462 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1463 parquet_writer_builder,
1464 table.file_io().clone(),
1465 DefaultLocationGenerator::new(table.metadata().clone())
1466 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1467 DefaultFileNameGenerator::new(
1468 writer_param.actor_id.to_string(),
1469 Some(format!("pos-del-{}", unique_uuid_suffix)),
1470 iceberg::spec::DataFileFormat::Parquet,
1471 ),
1472 );
1473 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1474 rolling_writer_builder,
1475 ))
1476 };
1477 let equality_delete_builder = {
1478 let config = EqualityDeleteWriterConfig::new(
1479 unique_column_ids.clone(),
1480 table.metadata().current_schema().clone(),
1481 )
1482 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1483 let parquet_writer_builder = ParquetWriterBuilder::new(
1484 parquet_writer_properties,
1485 Arc::new(
1486 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1487 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1488 ),
1489 );
1490 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1491 parquet_writer_builder,
1492 table.file_io().clone(),
1493 DefaultLocationGenerator::new(table.metadata().clone())
1494 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1495 DefaultFileNameGenerator::new(
1496 writer_param.actor_id.to_string(),
1497 Some(format!("eq-del-{}", unique_uuid_suffix)),
1498 iceberg::spec::DataFileFormat::Parquet,
1499 ),
1500 );
1501
1502 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1503 };
1504 let delta_builder = DeltaWriterBuilder::new(
1505 data_file_builder,
1506 position_delete_builder,
1507 equality_delete_builder,
1508 unique_column_ids,
1509 schema.clone(),
1510 );
1511 let original_arrow_schema = Arc::new(
1512 schema_to_arrow_schema(table.metadata().current_schema())
1513 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1514 );
1515 let schema_with_extra_op_column = {
1516 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1517 new_fields.push(Arc::new(ArrowField::new(
1518 "op".to_owned(),
1519 ArrowDataType::Int32,
1520 false,
1521 )));
1522 Arc::new(ArrowSchema::new(new_fields))
1523 };
1524 let writer_builder = TaskWriterBuilderWrapper::new(
1525 MonitoredGeneralWriterBuilder::new(
1526 delta_builder,
1527 write_qps.clone(),
1528 write_latency.clone(),
1529 ),
1530 fanout_enabled,
1531 schema.clone(),
1532 partition_spec.clone(),
1533 true,
1534 );
1535 let inner_writer = Some(Box::new(
1536 writer_builder
1537 .clone()
1538 .build()
1539 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1540 ) as Box<dyn IcebergWriter>);
1541 Ok(Self {
1542 arrow_schema: original_arrow_schema,
1543 metrics: IcebergWriterMetrics {
1544 _write_qps: write_qps,
1545 _write_latency: write_latency,
1546 write_bytes,
1547 },
1548 table,
1549 writer: IcebergWriterDispatch::Upsert {
1550 writer: inner_writer,
1551 writer_builder,
1552 arrow_schema_with_op_column: schema_with_extra_op_column,
1553 },
1554 project_idx_vec: {
1555 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1556 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1557 } else {
1558 ProjectIdxVec::None
1559 }
1560 },
1561 })
1562 }
1563}
1564
1565#[async_trait]
1566impl SinkWriter for IcebergSinkWriter {
1567 type CommitMetadata = Option<SinkMetadata>;
1568
1569 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1571 let Self::Created(args) = self else {
1572 return Ok(());
1573 };
1574
1575 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1576 let inner = match &args.unique_column_ids {
1577 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1578 table,
1579 unique_column_ids.clone(),
1580 &args.writer_param,
1581 )?,
1582 None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1583 };
1584
1585 *self = IcebergSinkWriter::Initialized(inner);
1586 Ok(())
1587 }
1588
1589 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1591 let Self::Initialized(inner) = self else {
1592 unreachable!("IcebergSinkWriter should be initialized before barrier");
1593 };
1594
1595 match &mut inner.writer {
1597 IcebergWriterDispatch::Append {
1598 writer,
1599 writer_builder,
1600 } => {
1601 if writer.is_none() {
1602 *writer = Some(Box::new(
1603 writer_builder
1604 .clone()
1605 .build()
1606 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1607 ));
1608 }
1609 }
1610 IcebergWriterDispatch::Upsert {
1611 writer,
1612 writer_builder,
1613 ..
1614 } => {
1615 if writer.is_none() {
1616 *writer = Some(Box::new(
1617 writer_builder
1618 .clone()
1619 .build()
1620 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1621 ));
1622 }
1623 }
1624 };
1625
1626 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1628 match &mut inner.project_idx_vec {
1629 ProjectIdxVec::None => {}
1630 ProjectIdxVec::Prepare(idx) => {
1631 if *idx >= chunk.columns().len() {
1632 return Err(SinkError::Iceberg(anyhow!(
1633 "invalid extra partition column index {}",
1634 idx
1635 )));
1636 }
1637 let project_idx_vec = (0..*idx)
1638 .chain(*idx + 1..chunk.columns().len())
1639 .collect_vec();
1640 chunk = chunk.project(&project_idx_vec);
1641 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1642 }
1643 ProjectIdxVec::Done(idx_vec) => {
1644 chunk = chunk.project(idx_vec);
1645 }
1646 }
1647 if ops.is_empty() {
1648 return Ok(());
1649 }
1650 let write_batch_size = chunk.estimated_heap_size();
1651 let batch = match &inner.writer {
1652 IcebergWriterDispatch::Append { .. } => {
1653 let filters =
1655 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1656 chunk.set_visibility(filters);
1657 IcebergArrowConvert
1658 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1659 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1660 }
1661 IcebergWriterDispatch::Upsert {
1662 arrow_schema_with_op_column,
1663 ..
1664 } => {
1665 let chunk = IcebergArrowConvert
1666 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1667 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1668 let ops = Arc::new(Int32Array::from(
1669 ops.iter()
1670 .map(|op| match op {
1671 Op::UpdateInsert | Op::Insert => INSERT_OP,
1672 Op::UpdateDelete | Op::Delete => DELETE_OP,
1673 })
1674 .collect_vec(),
1675 ));
1676 let mut columns = chunk.columns().to_vec();
1677 columns.push(ops);
1678 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1679 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1680 }
1681 };
1682
1683 let writer = inner.writer.get_writer().unwrap();
1684 writer
1685 .write(batch)
1686 .instrument_await("iceberg_write")
1687 .await
1688 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1689 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1690 Ok(())
1691 }
1692
1693 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1696 let Self::Initialized(inner) = self else {
1697 unreachable!("IcebergSinkWriter should be initialized before barrier");
1698 };
1699
1700 if !is_checkpoint {
1702 return Ok(None);
1703 }
1704
1705 let close_result = match &mut inner.writer {
1706 IcebergWriterDispatch::Append {
1707 writer,
1708 writer_builder,
1709 } => {
1710 let close_result = match writer.take() {
1711 Some(mut writer) => {
1712 Some(writer.close().instrument_await("iceberg_close").await)
1713 }
1714 _ => None,
1715 };
1716 match writer_builder.clone().build() {
1717 Ok(new_writer) => {
1718 *writer = Some(Box::new(new_writer));
1719 }
1720 _ => {
1721 warn!("Failed to build new writer after close");
1724 }
1725 }
1726 close_result
1727 }
1728 IcebergWriterDispatch::Upsert {
1729 writer,
1730 writer_builder,
1731 ..
1732 } => {
1733 let close_result = match writer.take() {
1734 Some(mut writer) => {
1735 Some(writer.close().instrument_await("iceberg_close").await)
1736 }
1737 _ => None,
1738 };
1739 match writer_builder.clone().build() {
1740 Ok(new_writer) => {
1741 *writer = Some(Box::new(new_writer));
1742 }
1743 _ => {
1744 warn!("Failed to build new writer after close");
1747 }
1748 }
1749 close_result
1750 }
1751 };
1752
1753 match close_result {
1754 Some(Ok(result)) => {
1755 let format_version = inner.table.metadata().format_version();
1756 let partition_type = inner.table.metadata().default_partition_type();
1757 let data_files = result
1758 .into_iter()
1759 .map(|f| {
1760 let truncated = truncate_datafile(f);
1762 SerializedDataFile::try_from(truncated, partition_type, format_version)
1763 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1764 })
1765 .collect::<Result<Vec<_>>>()?;
1766 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1767 data_files,
1768 schema_id: inner.table.metadata().current_schema_id(),
1769 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1770 })?))
1771 }
1772 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1773 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1774 }
1775 }
1776}
1777
1778const SCHEMA_ID: &str = "schema_id";
1779const PARTITION_SPEC_ID: &str = "partition_spec_id";
1780const DATA_FILES: &str = "data_files";
1781
1782const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1806 data_file.lower_bounds.retain(|field_id, datum| {
1808 let size = match datum.to_bytes() {
1810 Ok(bytes) => bytes.len(),
1811 Err(_) => 0,
1812 };
1813
1814 if size > MAX_COLUMN_STAT_SIZE {
1815 tracing::debug!(
1816 field_id = field_id,
1817 size = size,
1818 "Truncating large lower_bound statistic"
1819 );
1820 return false;
1821 }
1822 true
1823 });
1824
1825 data_file.upper_bounds.retain(|field_id, datum| {
1827 let size = match datum.to_bytes() {
1829 Ok(bytes) => bytes.len(),
1830 Err(_) => 0,
1831 };
1832
1833 if size > MAX_COLUMN_STAT_SIZE {
1834 tracing::debug!(
1835 field_id = field_id,
1836 size = size,
1837 "Truncating large upper_bound statistic"
1838 );
1839 return false;
1840 }
1841 true
1842 });
1843
1844 data_file
1845}
1846
1847#[derive(Default, Clone)]
1848struct IcebergCommitResult {
1849 schema_id: i32,
1850 partition_spec_id: i32,
1851 data_files: Vec<SerializedDataFile>,
1852}
1853
1854impl IcebergCommitResult {
1855 fn try_from(value: &SinkMetadata) -> Result<Self> {
1856 if let Some(Serialized(v)) = &value.metadata {
1857 let mut values = if let serde_json::Value::Object(v) =
1858 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1859 .context("Can't parse iceberg sink metadata")?
1860 {
1861 v
1862 } else {
1863 bail!("iceberg sink metadata should be an object");
1864 };
1865
1866 let schema_id;
1867 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1868 schema_id = value
1869 .as_u64()
1870 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1871 } else {
1872 bail!("iceberg sink metadata should have schema_id");
1873 }
1874
1875 let partition_spec_id;
1876 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1877 partition_spec_id = value
1878 .as_u64()
1879 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1880 } else {
1881 bail!("iceberg sink metadata should have partition_spec_id");
1882 }
1883
1884 let data_files: Vec<SerializedDataFile>;
1885 if let serde_json::Value::Array(values) = values
1886 .remove(DATA_FILES)
1887 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1888 {
1889 data_files = values
1890 .into_iter()
1891 .map(from_value::<SerializedDataFile>)
1892 .collect::<std::result::Result<_, _>>()
1893 .unwrap();
1894 } else {
1895 bail!("iceberg sink metadata should have data_files object");
1896 }
1897
1898 Ok(Self {
1899 schema_id: schema_id as i32,
1900 partition_spec_id: partition_spec_id as i32,
1901 data_files,
1902 })
1903 } else {
1904 bail!("Can't create iceberg sink write result from empty data!")
1905 }
1906 }
1907
1908 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1909 let mut values = if let serde_json::Value::Object(value) =
1910 serde_json::from_slice::<serde_json::Value>(&value)
1911 .context("Can't parse iceberg sink metadata")?
1912 {
1913 value
1914 } else {
1915 bail!("iceberg sink metadata should be an object");
1916 };
1917
1918 let schema_id;
1919 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1920 schema_id = value
1921 .as_u64()
1922 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1923 } else {
1924 bail!("iceberg sink metadata should have schema_id");
1925 }
1926
1927 let partition_spec_id;
1928 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1929 partition_spec_id = value
1930 .as_u64()
1931 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1932 } else {
1933 bail!("iceberg sink metadata should have partition_spec_id");
1934 }
1935
1936 let data_files: Vec<SerializedDataFile>;
1937 if let serde_json::Value::Array(values) = values
1938 .remove(DATA_FILES)
1939 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1940 {
1941 data_files = values
1942 .into_iter()
1943 .map(from_value::<SerializedDataFile>)
1944 .collect::<std::result::Result<_, _>>()
1945 .unwrap();
1946 } else {
1947 bail!("iceberg sink metadata should have data_files object");
1948 }
1949
1950 Ok(Self {
1951 schema_id: schema_id as i32,
1952 partition_spec_id: partition_spec_id as i32,
1953 data_files,
1954 })
1955 }
1956}
1957
1958impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1959 type Error = SinkError;
1960
1961 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1962 let json_data_files = serde_json::Value::Array(
1963 value
1964 .data_files
1965 .iter()
1966 .map(serde_json::to_value)
1967 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1968 .context("Can't serialize data files to json")?,
1969 );
1970 let json_value = serde_json::Value::Object(
1971 vec![
1972 (
1973 SCHEMA_ID.to_owned(),
1974 serde_json::Value::Number(value.schema_id.into()),
1975 ),
1976 (
1977 PARTITION_SPEC_ID.to_owned(),
1978 serde_json::Value::Number(value.partition_spec_id.into()),
1979 ),
1980 (DATA_FILES.to_owned(), json_data_files),
1981 ]
1982 .into_iter()
1983 .collect(),
1984 );
1985 Ok(SinkMetadata {
1986 metadata: Some(Serialized(SerializedMetadata {
1987 metadata: serde_json::to_vec(&json_value)
1988 .context("Can't serialize iceberg sink metadata")?,
1989 })),
1990 })
1991 }
1992}
1993
1994impl TryFrom<IcebergCommitResult> for Vec<u8> {
1995 type Error = SinkError;
1996
1997 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1998 let json_data_files = serde_json::Value::Array(
1999 value
2000 .data_files
2001 .iter()
2002 .map(serde_json::to_value)
2003 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2004 .context("Can't serialize data files to json")?,
2005 );
2006 let json_value = serde_json::Value::Object(
2007 vec![
2008 (
2009 SCHEMA_ID.to_owned(),
2010 serde_json::Value::Number(value.schema_id.into()),
2011 ),
2012 (
2013 PARTITION_SPEC_ID.to_owned(),
2014 serde_json::Value::Number(value.partition_spec_id.into()),
2015 ),
2016 (DATA_FILES.to_owned(), json_data_files),
2017 ]
2018 .into_iter()
2019 .collect(),
2020 );
2021 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2022 }
2023}
2024pub struct IcebergSinkCommitter {
2025 catalog: Arc<dyn Catalog>,
2026 table: Table,
2027 pub last_commit_epoch: u64,
2028 pub(crate) sink_id: SinkId,
2029 pub(crate) config: IcebergConfig,
2030 pub(crate) param: SinkParam,
2031 commit_retry_num: u32,
2032 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2033}
2034
2035impl IcebergSinkCommitter {
2036 async fn reload_table(
2039 catalog: &dyn Catalog,
2040 table_ident: &TableIdent,
2041 schema_id: i32,
2042 partition_spec_id: i32,
2043 ) -> Result<Table> {
2044 let table = catalog
2045 .load_table(table_ident)
2046 .await
2047 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2048 if table.metadata().current_schema_id() != schema_id {
2049 return Err(SinkError::Iceberg(anyhow!(
2050 "Schema evolution not supported, expect schema id {}, but got {}",
2051 schema_id,
2052 table.metadata().current_schema_id()
2053 )));
2054 }
2055 if table.metadata().default_partition_spec_id() != partition_spec_id {
2056 return Err(SinkError::Iceberg(anyhow!(
2057 "Partition evolution not supported, expect partition spec id {}, but got {}",
2058 partition_spec_id,
2059 table.metadata().default_partition_spec_id()
2060 )));
2061 }
2062 Ok(table)
2063 }
2064}
2065
2066#[async_trait]
2067impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2068 async fn init(&mut self) -> Result<()> {
2069 tracing::info!(
2070 sink_id = %self.param.sink_id,
2071 "Iceberg sink coordinator initialized",
2072 );
2073
2074 Ok(())
2075 }
2076
2077 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2078 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2079
2080 if metadata.is_empty() {
2081 tracing::debug!(?epoch, "No datafile to commit");
2082 return Ok(());
2083 }
2084
2085 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2087 self.commit_data_impl(epoch, write_results, snapshot_id)
2088 .await?;
2089 }
2090
2091 Ok(())
2092 }
2093
2094 async fn commit_schema_change(
2095 &mut self,
2096 epoch: u64,
2097 schema_change: PbSinkSchemaChange,
2098 ) -> Result<()> {
2099 tracing::info!(
2100 "Committing schema change {:?} in epoch {}",
2101 schema_change,
2102 epoch
2103 );
2104 self.commit_schema_change_impl(schema_change).await?;
2105 tracing::info!("Successfully committed schema change in epoch {}", epoch);
2106
2107 Ok(())
2108 }
2109}
2110
2111#[async_trait]
2112impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2113 async fn init(&mut self) -> Result<()> {
2114 tracing::info!(
2115 sink_id = %self.param.sink_id,
2116 "Iceberg sink coordinator initialized",
2117 );
2118
2119 Ok(())
2120 }
2121
2122 async fn pre_commit(
2123 &mut self,
2124 epoch: u64,
2125 metadata: Vec<SinkMetadata>,
2126 _schema_change: Option<PbSinkSchemaChange>,
2127 ) -> Result<Option<Vec<u8>>> {
2128 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2129
2130 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2131 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2132 None => {
2133 tracing::debug!(?epoch, "no data to pre commit");
2134 return Ok(None);
2135 }
2136 };
2137
2138 let mut write_results_bytes = Vec::new();
2139 for each_parallelism_write_result in write_results {
2140 let each_parallelism_write_result_bytes: Vec<u8> =
2141 each_parallelism_write_result.try_into()?;
2142 write_results_bytes.push(each_parallelism_write_result_bytes);
2143 }
2144
2145 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2146 write_results_bytes.push(snapshot_id_bytes);
2147
2148 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2149 Ok(Some(pre_commit_metadata_bytes))
2150 }
2151
2152 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2153 tracing::debug!("Starting iceberg commit in epoch {epoch}");
2154
2155 if commit_metadata.is_empty() {
2156 tracing::debug!(?epoch, "No datafile to commit");
2157 return Ok(());
2158 }
2159
2160 let mut payload = deserialize_metadata(commit_metadata);
2162 if payload.is_empty() {
2163 return Err(SinkError::Iceberg(anyhow!(
2164 "Invalid commit metadata: empty payload"
2165 )));
2166 }
2167
2168 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2170 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2171 })?;
2172 let snapshot_id = i64::from_le_bytes(
2173 snapshot_id_bytes
2174 .try_into()
2175 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2176 );
2177
2178 let write_results = payload
2180 .into_iter()
2181 .map(IcebergCommitResult::try_from_serialized_bytes)
2182 .collect::<Result<Vec<_>>>()?;
2183
2184 let snapshot_committed = self
2185 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2186 .await?;
2187
2188 if snapshot_committed {
2189 tracing::info!(
2190 "Snapshot id {} already committed in iceberg table, skip committing again.",
2191 snapshot_id
2192 );
2193 return Ok(());
2194 }
2195
2196 self.commit_data_impl(epoch, write_results, snapshot_id)
2197 .await
2198 }
2199
2200 async fn commit_schema_change(
2201 &mut self,
2202 epoch: u64,
2203 schema_change: PbSinkSchemaChange,
2204 ) -> Result<()> {
2205 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2206 if schema_updated {
2207 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2208 return Ok(());
2209 }
2210
2211 tracing::info!(
2212 "Committing schema change {:?} in epoch {}",
2213 schema_change,
2214 epoch
2215 );
2216 self.commit_schema_change_impl(schema_change).await?;
2217 tracing::info!("Successfully committed schema change in epoch {epoch}");
2218
2219 Ok(())
2220 }
2221
2222 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2223 tracing::debug!("Abort not implemented yet");
2225 }
2226}
2227
2228impl IcebergSinkCommitter {
2230 fn pre_commit_inner(
2231 &mut self,
2232 _epoch: u64,
2233 metadata: Vec<SinkMetadata>,
2234 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2235 let write_results: Vec<IcebergCommitResult> = metadata
2236 .iter()
2237 .map(IcebergCommitResult::try_from)
2238 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2239
2240 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2242 return Ok(None);
2243 }
2244
2245 let expect_schema_id = write_results[0].schema_id;
2246 let expect_partition_spec_id = write_results[0].partition_spec_id;
2247
2248 if write_results
2250 .iter()
2251 .any(|r| r.schema_id != expect_schema_id)
2252 || write_results
2253 .iter()
2254 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2255 {
2256 return Err(SinkError::Iceberg(anyhow!(
2257 "schema_id and partition_spec_id should be the same in all write results"
2258 )));
2259 }
2260
2261 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2262
2263 Ok(Some((write_results, snapshot_id)))
2264 }
2265
2266 async fn commit_data_impl(
2267 &mut self,
2268 epoch: u64,
2269 write_results: Vec<IcebergCommitResult>,
2270 snapshot_id: i64,
2271 ) -> Result<()> {
2272 assert!(
2274 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2275 );
2276
2277 self.wait_for_snapshot_limit().await?;
2279
2280 let expect_schema_id = write_results[0].schema_id;
2281 let expect_partition_spec_id = write_results[0].partition_spec_id;
2282
2283 self.table = Self::reload_table(
2285 self.catalog.as_ref(),
2286 self.table.identifier(),
2287 expect_schema_id,
2288 expect_partition_spec_id,
2289 )
2290 .await?;
2291
2292 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2293 return Err(SinkError::Iceberg(anyhow!(
2294 "Can't find schema by id {}",
2295 expect_schema_id
2296 )));
2297 };
2298 let Some(partition_spec) = self
2299 .table
2300 .metadata()
2301 .partition_spec_by_id(expect_partition_spec_id)
2302 else {
2303 return Err(SinkError::Iceberg(anyhow!(
2304 "Can't find partition spec by id {}",
2305 expect_partition_spec_id
2306 )));
2307 };
2308 let partition_type = partition_spec
2309 .as_ref()
2310 .clone()
2311 .partition_type(schema)
2312 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2313
2314 let data_files = write_results
2315 .into_iter()
2316 .flat_map(|r| {
2317 r.data_files.into_iter().map(|f| {
2318 f.try_into(expect_partition_spec_id, &partition_type, schema)
2319 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2320 })
2321 })
2322 .collect::<Result<Vec<DataFile>>>()?;
2323 let retry_strategy = ExponentialBackoff::from_millis(10)
2328 .max_delay(Duration::from_secs(60))
2329 .map(jitter)
2330 .take(self.commit_retry_num as usize);
2331 let catalog = self.catalog.clone();
2332 let table_ident = self.table.identifier().clone();
2333
2334 enum CommitError {
2339 ReloadTable(SinkError), Commit(SinkError), }
2342
2343 let table = RetryIf::spawn(
2344 retry_strategy,
2345 || async {
2346 let table = Self::reload_table(
2348 catalog.as_ref(),
2349 &table_ident,
2350 expect_schema_id,
2351 expect_partition_spec_id,
2352 )
2353 .await
2354 .map_err(|e| {
2355 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2356 CommitError::ReloadTable(e)
2357 })?;
2358
2359 let txn = Transaction::new(&table);
2360 let append_action = txn
2361 .fast_append()
2362 .set_snapshot_id(snapshot_id)
2363 .set_target_branch(commit_branch(
2364 self.config.r#type.as_str(),
2365 self.config.write_mode,
2366 ))
2367 .add_data_files(data_files.clone());
2368
2369 let tx = append_action.apply(txn).map_err(|err| {
2370 let err: IcebergError = err.into();
2371 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2372 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2373 })?;
2374
2375 tx.commit(catalog.as_ref()).await.map_err(|err| {
2376 let err: IcebergError = err.into();
2377 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2378 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2379 })
2380 },
2381 |err: &CommitError| {
2382 match err {
2384 CommitError::Commit(_) => {
2385 tracing::warn!("Commit failed, will retry");
2386 true
2387 }
2388 CommitError::ReloadTable(_) => {
2389 tracing::error!(
2390 "reload_table failed with non-retriable error, will not retry"
2391 );
2392 false
2393 }
2394 }
2395 },
2396 )
2397 .await
2398 .map_err(|e| match e {
2399 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2400 })?;
2401 self.table = table;
2402
2403 let snapshot_num = self.table.metadata().snapshots().count();
2404 let catalog_name = self.config.common.catalog_name();
2405 let table_name = self.table.identifier().to_string();
2406 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2407 GLOBAL_SINK_METRICS
2408 .iceberg_snapshot_num
2409 .with_guarded_label_values(&metrics_labels)
2410 .set(snapshot_num as i64);
2411
2412 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2413
2414 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2415 && self.config.enable_compaction
2416 && iceberg_compact_stat_sender
2417 .send(IcebergSinkCompactionUpdate {
2418 sink_id: self.sink_id,
2419 compaction_interval: self.config.compaction_interval_sec(),
2420 force_compaction: false,
2421 })
2422 .is_err()
2423 {
2424 warn!("failed to send iceberg compaction stats");
2425 }
2426
2427 Ok(())
2428 }
2429
2430 async fn is_snapshot_id_in_iceberg(
2434 &self,
2435 iceberg_config: &IcebergConfig,
2436 snapshot_id: i64,
2437 ) -> Result<bool> {
2438 let table = iceberg_config.load_table().await?;
2439 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2440 Ok(true)
2441 } else {
2442 Ok(false)
2443 }
2444 }
2445
2446 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2449 let current_schema = self.table.metadata().current_schema();
2450 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2451 .context("Failed to convert schema")
2452 .map_err(SinkError::Iceberg)?;
2453
2454 let iceberg_arrow_convert = IcebergArrowConvert;
2455
2456 let schema_matches = |expected: &[ArrowField]| {
2457 if current_arrow_schema.fields().len() != expected.len() {
2458 return false;
2459 }
2460
2461 expected.iter().all(|expected_field| {
2462 current_arrow_schema.fields().iter().any(|current_field| {
2463 current_field.name() == expected_field.name()
2464 && current_field.data_type() == expected_field.data_type()
2465 })
2466 })
2467 };
2468
2469 let original_arrow_fields: Vec<ArrowField> = schema_change
2470 .original_schema
2471 .iter()
2472 .map(|pb_field| {
2473 let field = Field::from(pb_field);
2474 iceberg_arrow_convert
2475 .to_arrow_field(&field.name, &field.data_type)
2476 .context("Failed to convert field to arrow")
2477 .map_err(SinkError::Iceberg)
2478 })
2479 .collect::<Result<_>>()?;
2480
2481 if schema_matches(&original_arrow_fields) {
2483 tracing::debug!(
2484 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2485 original_arrow_fields.len()
2486 );
2487 return Ok(false);
2488 }
2489
2490 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2492 schema_change.op.as_ref()
2493 else {
2494 return Err(SinkError::Iceberg(anyhow!(
2495 "Unsupported sink schema change op in iceberg sink: {:?}",
2496 schema_change.op
2497 )));
2498 };
2499
2500 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2501 .fields
2502 .iter()
2503 .map(|pb_field| {
2504 let field = Field::from(pb_field);
2505 iceberg_arrow_convert
2506 .to_arrow_field(&field.name, &field.data_type)
2507 .context("Failed to convert field to arrow")
2508 .map_err(SinkError::Iceberg)
2509 })
2510 .collect::<Result<_>>()?;
2511
2512 let mut expected_after_change = original_arrow_fields;
2513 expected_after_change.extend(add_arrow_fields);
2514
2515 if schema_matches(&expected_after_change) {
2517 tracing::debug!(
2518 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2519 expected_after_change.len()
2520 );
2521 return Ok(true);
2522 }
2523
2524 Err(SinkError::Iceberg(anyhow!(
2525 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2526 schema_change.original_schema.len()
2527 )))
2528 }
2529
2530 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2534 use iceberg::spec::NestedField;
2535
2536 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2537 schema_change.op.as_ref()
2538 else {
2539 return Err(SinkError::Iceberg(anyhow!(
2540 "Unsupported sink schema change op in iceberg sink: {:?}",
2541 schema_change.op
2542 )));
2543 };
2544
2545 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2546
2547 let metadata = self.table.metadata();
2549 let mut next_field_id = metadata.last_column_id() + 1;
2550 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2551
2552 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2554 let mut new_fields = Vec::new();
2555
2556 for field in &add_columns {
2557 let arrow_field = iceberg_create_table_arrow_convert
2559 .to_arrow_field(&field.name, &field.data_type)
2560 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2561 .map_err(SinkError::Iceberg)?;
2562
2563 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2565 .map_err(|err| {
2566 SinkError::Iceberg(
2567 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2568 )
2569 })?;
2570
2571 let nested_field = Arc::new(NestedField::optional(
2573 next_field_id,
2574 &field.name,
2575 iceberg_type,
2576 ));
2577
2578 new_fields.push(nested_field);
2579 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2580 next_field_id += 1;
2581 }
2582
2583 tracing::info!(
2585 "Committing schema change to catalog for table {}",
2586 self.table.identifier()
2587 );
2588
2589 let txn = Transaction::new(&self.table);
2590 let action = txn.update_schema().add_fields(new_fields);
2591
2592 let updated_table = action
2593 .apply(txn)
2594 .context("Failed to apply schema update action")
2595 .map_err(SinkError::Iceberg)?
2596 .commit(self.catalog.as_ref())
2597 .await
2598 .context("Failed to commit table schema change")
2599 .map_err(SinkError::Iceberg)?;
2600
2601 self.table = updated_table;
2602
2603 tracing::info!(
2604 "Successfully committed schema change, added {} columns to iceberg table",
2605 add_columns.len()
2606 );
2607
2608 Ok(())
2609 }
2610
2611 fn count_snapshots_since_rewrite(&self) -> usize {
2614 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2615 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2616
2617 let mut count = 0;
2619 for snapshot in snapshots {
2620 let summary = snapshot.summary();
2622 match &summary.operation {
2623 Operation::Replace => {
2624 break;
2626 }
2627
2628 _ => {
2629 count += 1;
2631 }
2632 }
2633 }
2634
2635 count
2636 }
2637
2638 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2640 if !self.config.enable_compaction {
2641 return Ok(());
2642 }
2643
2644 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2645 loop {
2646 let current_count = self.count_snapshots_since_rewrite();
2647
2648 if current_count < max_snapshots {
2649 tracing::info!(
2650 "Snapshot count check passed: {} < {}",
2651 current_count,
2652 max_snapshots
2653 );
2654 break;
2655 }
2656
2657 tracing::info!(
2658 "Snapshot count {} exceeds limit {}, waiting...",
2659 current_count,
2660 max_snapshots
2661 );
2662
2663 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2664 && iceberg_compact_stat_sender
2665 .send(IcebergSinkCompactionUpdate {
2666 sink_id: self.sink_id,
2667 compaction_interval: self.config.compaction_interval_sec(),
2668 force_compaction: true,
2669 })
2670 .is_err()
2671 {
2672 tracing::warn!("failed to send iceberg compaction stats");
2673 }
2674
2675 tokio::time::sleep(Duration::from_secs(30)).await;
2677
2678 self.table = self.config.load_table().await?;
2680 }
2681 }
2682 Ok(())
2683 }
2684}
2685
2686const MAP_KEY: &str = "key";
2687const MAP_VALUE: &str = "value";
2688
2689fn get_fields<'a>(
2690 our_field_type: &'a risingwave_common::types::DataType,
2691 data_type: &ArrowDataType,
2692 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2693) -> Option<ArrowFields> {
2694 match data_type {
2695 ArrowDataType::Struct(fields) => {
2696 match our_field_type {
2697 risingwave_common::types::DataType::Struct(struct_fields) => {
2698 struct_fields.iter().for_each(|(name, data_type)| {
2699 let res = schema_fields.insert(name, data_type);
2700 assert!(res.is_none())
2702 });
2703 }
2704 risingwave_common::types::DataType::Map(map_fields) => {
2705 schema_fields.insert(MAP_KEY, map_fields.key());
2706 schema_fields.insert(MAP_VALUE, map_fields.value());
2707 }
2708 risingwave_common::types::DataType::List(list) => {
2709 list.elem()
2710 .as_struct()
2711 .iter()
2712 .for_each(|(name, data_type)| {
2713 let res = schema_fields.insert(name, data_type);
2714 assert!(res.is_none())
2716 });
2717 }
2718 _ => {}
2719 };
2720 Some(fields.clone())
2721 }
2722 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2723 get_fields(our_field_type, field.data_type(), schema_fields)
2724 }
2725 _ => None, }
2727}
2728
2729fn check_compatibility(
2730 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2731 fields: &ArrowFields,
2732) -> anyhow::Result<bool> {
2733 for arrow_field in fields {
2734 let our_field_type = schema_fields
2735 .get(arrow_field.name().as_str())
2736 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2737
2738 let converted_arrow_data_type = IcebergArrowConvert
2740 .to_arrow_field("", our_field_type)
2741 .map_err(|e| anyhow!(e))?
2742 .data_type()
2743 .clone();
2744
2745 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2746 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2747 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2748 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2749 (ArrowDataType::List(_), ArrowDataType::List(field))
2750 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2751 let mut schema_fields = HashMap::new();
2752 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2753 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2754 }
2755 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2757 let mut schema_fields = HashMap::new();
2758 our_field_type
2759 .as_struct()
2760 .iter()
2761 .for_each(|(name, data_type)| {
2762 let res = schema_fields.insert(name, data_type);
2763 assert!(res.is_none())
2765 });
2766 check_compatibility(schema_fields, fields)?
2767 }
2768 (left, right) => left.equals_datatype(right),
2776 };
2777 if !compatible {
2778 bail!(
2779 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2780 arrow_field.name(),
2781 converted_arrow_data_type,
2782 arrow_field.data_type()
2783 );
2784 }
2785 }
2786 Ok(true)
2787}
2788
2789pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2791 if rw_schema.fields.len() != arrow_schema.fields().len() {
2792 bail!(
2793 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2794 rw_schema.fields.len(),
2795 arrow_schema.fields.len()
2796 );
2797 }
2798
2799 let mut schema_fields = HashMap::new();
2800 rw_schema.fields.iter().for_each(|field| {
2801 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2802 assert!(res.is_none())
2804 });
2805
2806 check_compatibility(schema_fields, &arrow_schema.fields)?;
2807 Ok(())
2808}
2809
2810fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2811 serde_json::to_vec(&metadata).unwrap()
2812}
2813
2814fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2815 serde_json::from_slice(&bytes).unwrap()
2816}
2817
2818pub fn parse_partition_by_exprs(
2819 expr: String,
2820) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2821 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2823 if !re.is_match(&expr) {
2824 bail!(format!(
2825 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2826 expr
2827 ))
2828 }
2829 let caps = re.captures_iter(&expr);
2830
2831 let mut partition_columns = vec![];
2832
2833 for mat in caps {
2834 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2835 (&mat["transform"], Transform::Identity)
2836 } else {
2837 let mut func = mat["transform"].to_owned();
2838 if func == "bucket" || func == "truncate" {
2839 let n = &mat
2840 .name("n")
2841 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2842 .as_str();
2843 func = format!("{func}[{n}]");
2844 }
2845 (
2846 &mat["field"],
2847 Transform::from_str(&func)
2848 .with_context(|| format!("invalid transform function {}", func))?,
2849 )
2850 };
2851 partition_columns.push((column.to_owned(), transform));
2852 }
2853 Ok(partition_columns)
2854}
2855
2856pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2857 if should_enable_iceberg_cow(sink_type, write_mode) {
2858 ICEBERG_COW_BRANCH.to_owned()
2859 } else {
2860 MAIN_BRANCH.to_owned()
2861 }
2862}
2863
2864pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2865 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2866}
2867
2868impl crate::with_options::WithOptions for IcebergWriteMode {}
2869
2870impl crate::with_options::WithOptions for FormatVersion {}
2871
2872impl crate::with_options::WithOptions for CompactionType {}
2873
2874#[cfg(test)]
2875mod test {
2876 use std::collections::BTreeMap;
2877
2878 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2879 use risingwave_common::types::{DataType, MapType, StructType};
2880
2881 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2882 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2883 use crate::sink::iceberg::{
2884 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2885 ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2886 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2887 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2888 };
2889
2890 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2893 fn test_compatible_arrow_schema() {
2894 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2895
2896 use super::*;
2897 let risingwave_schema = Schema::new(vec![
2898 Field::with_name(DataType::Int32, "a"),
2899 Field::with_name(DataType::Int32, "b"),
2900 Field::with_name(DataType::Int32, "c"),
2901 ]);
2902 let arrow_schema = ArrowSchema::new(vec![
2903 ArrowField::new("a", ArrowDataType::Int32, false),
2904 ArrowField::new("b", ArrowDataType::Int32, false),
2905 ArrowField::new("c", ArrowDataType::Int32, false),
2906 ]);
2907
2908 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2909
2910 let risingwave_schema = Schema::new(vec![
2911 Field::with_name(DataType::Int32, "d"),
2912 Field::with_name(DataType::Int32, "c"),
2913 Field::with_name(DataType::Int32, "a"),
2914 Field::with_name(DataType::Int32, "b"),
2915 ]);
2916 let arrow_schema = ArrowSchema::new(vec![
2917 ArrowField::new("a", ArrowDataType::Int32, false),
2918 ArrowField::new("b", ArrowDataType::Int32, false),
2919 ArrowField::new("d", ArrowDataType::Int32, false),
2920 ArrowField::new("c", ArrowDataType::Int32, false),
2921 ]);
2922 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2923
2924 let risingwave_schema = Schema::new(vec![
2925 Field::with_name(
2926 DataType::Struct(StructType::new(vec![
2927 ("a1", DataType::Int32),
2928 (
2929 "a2",
2930 DataType::Struct(StructType::new(vec![
2931 ("a21", DataType::Bytea),
2932 (
2933 "a22",
2934 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2935 ),
2936 ])),
2937 ),
2938 ])),
2939 "a",
2940 ),
2941 Field::with_name(
2942 DataType::list(DataType::Struct(StructType::new(vec![
2943 ("b1", DataType::Int32),
2944 ("b2", DataType::Bytea),
2945 (
2946 "b3",
2947 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2948 ),
2949 ]))),
2950 "b",
2951 ),
2952 Field::with_name(
2953 DataType::Map(MapType::from_kv(
2954 DataType::Varchar,
2955 DataType::list(DataType::Struct(StructType::new([
2956 ("c1", DataType::Int32),
2957 ("c2", DataType::Bytea),
2958 (
2959 "c3",
2960 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2961 ),
2962 ]))),
2963 )),
2964 "c",
2965 ),
2966 ]);
2967 let arrow_schema = ArrowSchema::new(vec![
2968 ArrowField::new(
2969 "a",
2970 ArrowDataType::Struct(ArrowFields::from(vec![
2971 ArrowField::new("a1", ArrowDataType::Int32, false),
2972 ArrowField::new(
2973 "a2",
2974 ArrowDataType::Struct(ArrowFields::from(vec![
2975 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2976 ArrowField::new_map(
2977 "a22",
2978 "entries",
2979 ArrowFieldRef::new(ArrowField::new(
2980 "key",
2981 ArrowDataType::Utf8,
2982 false,
2983 )),
2984 ArrowFieldRef::new(ArrowField::new(
2985 "value",
2986 ArrowDataType::Utf8,
2987 false,
2988 )),
2989 false,
2990 false,
2991 ),
2992 ])),
2993 false,
2994 ),
2995 ])),
2996 false,
2997 ),
2998 ArrowField::new(
2999 "b",
3000 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3001 ArrowDataType::Struct(ArrowFields::from(vec![
3002 ArrowField::new("b1", ArrowDataType::Int32, false),
3003 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3004 ArrowField::new_map(
3005 "b3",
3006 "entries",
3007 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3008 ArrowFieldRef::new(ArrowField::new(
3009 "value",
3010 ArrowDataType::Utf8,
3011 false,
3012 )),
3013 false,
3014 false,
3015 ),
3016 ])),
3017 false,
3018 ))),
3019 false,
3020 ),
3021 ArrowField::new_map(
3022 "c",
3023 "entries",
3024 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3025 ArrowFieldRef::new(ArrowField::new(
3026 "value",
3027 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3028 ArrowDataType::Struct(ArrowFields::from(vec![
3029 ArrowField::new("c1", ArrowDataType::Int32, false),
3030 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3031 ArrowField::new_map(
3032 "c3",
3033 "entries",
3034 ArrowFieldRef::new(ArrowField::new(
3035 "key",
3036 ArrowDataType::Utf8,
3037 false,
3038 )),
3039 ArrowFieldRef::new(ArrowField::new(
3040 "value",
3041 ArrowDataType::Utf8,
3042 false,
3043 )),
3044 false,
3045 false,
3046 ),
3047 ])),
3048 false,
3049 ))),
3050 false,
3051 )),
3052 false,
3053 false,
3054 ),
3055 ]);
3056 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3057 }
3058
3059 #[test]
3060 fn test_parse_iceberg_config() {
3061 let values = [
3062 ("connector", "iceberg"),
3063 ("type", "upsert"),
3064 ("primary_key", "v1"),
3065 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3066 ("warehouse.path", "s3://iceberg"),
3067 ("s3.endpoint", "http://127.0.0.1:9301"),
3068 ("s3.access.key", "hummockadmin"),
3069 ("s3.secret.key", "hummockadmin"),
3070 ("s3.path.style.access", "true"),
3071 ("s3.region", "us-east-1"),
3072 ("catalog.type", "jdbc"),
3073 ("catalog.name", "demo"),
3074 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3075 ("catalog.jdbc.user", "admin"),
3076 ("catalog.jdbc.password", "123456"),
3077 ("database.name", "demo_db"),
3078 ("table.name", "demo_table"),
3079 ("enable_compaction", "true"),
3080 ("compaction_interval_sec", "1800"),
3081 ("enable_snapshot_expiration", "true"),
3082 ]
3083 .into_iter()
3084 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3085 .collect();
3086
3087 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3088
3089 let expected_iceberg_config = IcebergConfig {
3090 common: IcebergCommon {
3091 warehouse_path: Some("s3://iceberg".to_owned()),
3092 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3093 s3_region: Some("us-east-1".to_owned()),
3094 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3095 s3_access_key: Some("hummockadmin".to_owned()),
3096 s3_secret_key: Some("hummockadmin".to_owned()),
3097 s3_iam_role_arn: None,
3098 gcs_credential: None,
3099 catalog_type: Some("jdbc".to_owned()),
3100 glue_id: None,
3101 glue_region: None,
3102 glue_access_key: None,
3103 glue_secret_key: None,
3104 glue_iam_role_arn: None,
3105 catalog_name: Some("demo".to_owned()),
3106 s3_path_style_access: Some(true),
3107 catalog_credential: None,
3108 catalog_oauth2_server_uri: None,
3109 catalog_scope: None,
3110 catalog_token: None,
3111 enable_config_load: None,
3112 rest_signing_name: None,
3113 rest_signing_region: None,
3114 rest_sigv4_enabled: None,
3115 hosted_catalog: None,
3116 azblob_account_name: None,
3117 azblob_account_key: None,
3118 azblob_endpoint_url: None,
3119 catalog_header: None,
3120 adlsgen2_account_name: None,
3121 adlsgen2_account_key: None,
3122 adlsgen2_endpoint: None,
3123 vended_credentials: None,
3124 catalog_security: None,
3125 gcp_auth_scopes: None,
3126 catalog_io_impl: None,
3127 },
3128 table: IcebergTableIdentifier {
3129 database_name: Some("demo_db".to_owned()),
3130 table_name: "demo_table".to_owned(),
3131 },
3132 r#type: "upsert".to_owned(),
3133 force_append_only: false,
3134 primary_key: Some(vec!["v1".to_owned()]),
3135 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3136 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3137 .into_iter()
3138 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3139 .collect(),
3140 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3141 create_table_if_not_exists: false,
3142 is_exactly_once: Some(true),
3143 commit_retry_num: 8,
3144 enable_compaction: true,
3145 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3146 enable_snapshot_expiration: true,
3147 write_mode: IcebergWriteMode::MergeOnRead,
3148 format_version: FormatVersion::V2,
3149 snapshot_expiration_max_age_millis: None,
3150 snapshot_expiration_retain_last: None,
3151 snapshot_expiration_clear_expired_files: true,
3152 snapshot_expiration_clear_expired_meta_data: true,
3153 max_snapshots_num_before_compaction: None,
3154 small_files_threshold_mb: None,
3155 delete_files_count_threshold: None,
3156 trigger_snapshot_count: None,
3157 target_file_size_mb: None,
3158 compaction_type: None,
3159 };
3160
3161 assert_eq!(iceberg_config, expected_iceberg_config);
3162
3163 assert_eq!(
3164 &iceberg_config.full_table_name().unwrap().to_string(),
3165 "demo_db.demo_table"
3166 );
3167 }
3168
3169 async fn test_create_catalog(configs: BTreeMap<String, String>) {
3170 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3171
3172 let _table = iceberg_config.load_table().await.unwrap();
3173 }
3174
3175 #[tokio::test]
3176 #[ignore]
3177 async fn test_storage_catalog() {
3178 let values = [
3179 ("connector", "iceberg"),
3180 ("type", "append-only"),
3181 ("force_append_only", "true"),
3182 ("s3.endpoint", "http://127.0.0.1:9301"),
3183 ("s3.access.key", "hummockadmin"),
3184 ("s3.secret.key", "hummockadmin"),
3185 ("s3.region", "us-east-1"),
3186 ("s3.path.style.access", "true"),
3187 ("catalog.name", "demo"),
3188 ("catalog.type", "storage"),
3189 ("warehouse.path", "s3://icebergdata/demo"),
3190 ("database.name", "s1"),
3191 ("table.name", "t1"),
3192 ]
3193 .into_iter()
3194 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3195 .collect();
3196
3197 test_create_catalog(values).await;
3198 }
3199
3200 #[tokio::test]
3201 #[ignore]
3202 async fn test_rest_catalog() {
3203 let values = [
3204 ("connector", "iceberg"),
3205 ("type", "append-only"),
3206 ("force_append_only", "true"),
3207 ("s3.endpoint", "http://127.0.0.1:9301"),
3208 ("s3.access.key", "hummockadmin"),
3209 ("s3.secret.key", "hummockadmin"),
3210 ("s3.region", "us-east-1"),
3211 ("s3.path.style.access", "true"),
3212 ("catalog.name", "demo"),
3213 ("catalog.type", "rest"),
3214 ("catalog.uri", "http://192.168.167.4:8181"),
3215 ("warehouse.path", "s3://icebergdata/demo"),
3216 ("database.name", "s1"),
3217 ("table.name", "t1"),
3218 ]
3219 .into_iter()
3220 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3221 .collect();
3222
3223 test_create_catalog(values).await;
3224 }
3225
3226 #[tokio::test]
3227 #[ignore]
3228 async fn test_jdbc_catalog() {
3229 let values = [
3230 ("connector", "iceberg"),
3231 ("type", "append-only"),
3232 ("force_append_only", "true"),
3233 ("s3.endpoint", "http://127.0.0.1:9301"),
3234 ("s3.access.key", "hummockadmin"),
3235 ("s3.secret.key", "hummockadmin"),
3236 ("s3.region", "us-east-1"),
3237 ("s3.path.style.access", "true"),
3238 ("catalog.name", "demo"),
3239 ("catalog.type", "jdbc"),
3240 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3241 ("catalog.jdbc.user", "admin"),
3242 ("catalog.jdbc.password", "123456"),
3243 ("warehouse.path", "s3://icebergdata/demo"),
3244 ("database.name", "s1"),
3245 ("table.name", "t1"),
3246 ]
3247 .into_iter()
3248 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3249 .collect();
3250
3251 test_create_catalog(values).await;
3252 }
3253
3254 #[tokio::test]
3255 #[ignore]
3256 async fn test_hive_catalog() {
3257 let values = [
3258 ("connector", "iceberg"),
3259 ("type", "append-only"),
3260 ("force_append_only", "true"),
3261 ("s3.endpoint", "http://127.0.0.1:9301"),
3262 ("s3.access.key", "hummockadmin"),
3263 ("s3.secret.key", "hummockadmin"),
3264 ("s3.region", "us-east-1"),
3265 ("s3.path.style.access", "true"),
3266 ("catalog.name", "demo"),
3267 ("catalog.type", "hive"),
3268 ("catalog.uri", "thrift://localhost:9083"),
3269 ("warehouse.path", "s3://icebergdata/demo"),
3270 ("database.name", "s1"),
3271 ("table.name", "t1"),
3272 ]
3273 .into_iter()
3274 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3275 .collect();
3276
3277 test_create_catalog(values).await;
3278 }
3279
3280 #[test]
3282 fn test_parse_google_auth_with_custom_scopes() {
3283 let values: BTreeMap<String, String> = [
3284 ("connector", "iceberg"),
3285 ("type", "append-only"),
3286 ("force_append_only", "true"),
3287 ("catalog.name", "biglake-catalog"),
3288 ("catalog.type", "rest"),
3289 (
3290 "catalog.uri",
3291 "https://biglake.googleapis.com/iceberg/v1/restcatalog",
3292 ),
3293 ("warehouse.path", "bq://projects/my-gcp-project"),
3294 ("catalog.security", "google"),
3295 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3296 ("database.name", "my_dataset"),
3297 ("table.name", "my_table"),
3298 ]
3299 .into_iter()
3300 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3301 .collect();
3302
3303 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3304
3305 assert_eq!(iceberg_config.catalog_type(), "rest");
3307
3308 assert_eq!(
3310 iceberg_config.common.catalog_security.as_deref(),
3311 Some("google")
3312 );
3313 assert_eq!(
3314 iceberg_config.common.gcp_auth_scopes.as_deref(),
3315 Some(
3316 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3317 )
3318 );
3319
3320 assert_eq!(
3322 iceberg_config.common.warehouse_path.as_deref(),
3323 Some("bq://projects/my-gcp-project")
3324 );
3325 }
3326
3327 #[test]
3329 fn test_parse_biglake_google_auth_config() {
3330 let values: BTreeMap<String, String> = [
3331 ("connector", "iceberg"),
3332 ("type", "append-only"),
3333 ("force_append_only", "true"),
3334 ("catalog.name", "biglake-catalog"),
3335 ("catalog.type", "rest"),
3336 (
3337 "catalog.uri",
3338 "https://biglake.googleapis.com/iceberg/v1/restcatalog",
3339 ),
3340 ("warehouse.path", "bq://projects/my-gcp-project"),
3341 (
3342 "catalog.header",
3343 "x-goog-user-project=my-gcp-project",
3344 ),
3345 ("catalog.security", "google"),
3346 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3347 ("database.name", "my_dataset"),
3348 ("table.name", "my_table"),
3349 ]
3350 .into_iter()
3351 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3352 .collect();
3353
3354 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3355
3356 assert_eq!(iceberg_config.catalog_type(), "rest");
3358
3359 assert_eq!(
3361 iceberg_config.common.catalog_security.as_deref(),
3362 Some("google")
3363 );
3364 assert_eq!(
3365 iceberg_config.common.gcp_auth_scopes.as_deref(),
3366 Some(
3367 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3368 )
3369 );
3370
3371 assert_eq!(
3373 iceberg_config.common.warehouse_path.as_deref(),
3374 Some("bq://projects/my-gcp-project")
3375 );
3376
3377 assert_eq!(
3379 iceberg_config.common.catalog_header.as_deref(),
3380 Some("x-goog-user-project=my-gcp-project")
3381 );
3382 }
3383
3384 #[test]
3386 fn test_parse_oauth2_security_config() {
3387 let values: BTreeMap<String, String> = [
3388 ("connector", "iceberg"),
3389 ("type", "append-only"),
3390 ("force_append_only", "true"),
3391 ("catalog.name", "oauth2-catalog"),
3392 ("catalog.type", "rest"),
3393 ("catalog.uri", "https://example.com/iceberg/rest"),
3394 ("warehouse.path", "s3://my-bucket/warehouse"),
3395 ("catalog.security", "oauth2"),
3396 ("catalog.credential", "client_id:client_secret"),
3397 ("catalog.token", "bearer-token"),
3398 (
3399 "catalog.oauth2_server_uri",
3400 "https://oauth.example.com/token",
3401 ),
3402 ("catalog.scope", "read write"),
3403 ("database.name", "test_db"),
3404 ("table.name", "test_table"),
3405 ]
3406 .into_iter()
3407 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3408 .collect();
3409
3410 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3411
3412 assert_eq!(iceberg_config.catalog_type(), "rest");
3414
3415 assert_eq!(
3417 iceberg_config.common.catalog_security.as_deref(),
3418 Some("oauth2")
3419 );
3420 assert_eq!(
3421 iceberg_config.common.catalog_credential.as_deref(),
3422 Some("client_id:client_secret")
3423 );
3424 assert_eq!(
3425 iceberg_config.common.catalog_token.as_deref(),
3426 Some("bearer-token")
3427 );
3428 assert_eq!(
3429 iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3430 Some("https://oauth.example.com/token")
3431 );
3432 assert_eq!(
3433 iceberg_config.common.catalog_scope.as_deref(),
3434 Some("read write")
3435 );
3436 }
3437
3438 #[test]
3440 fn test_parse_invalid_security_config() {
3441 let values: BTreeMap<String, String> = [
3442 ("connector", "iceberg"),
3443 ("type", "append-only"),
3444 ("force_append_only", "true"),
3445 ("catalog.name", "invalid-catalog"),
3446 ("catalog.type", "rest"),
3447 ("catalog.uri", "https://example.com/iceberg/rest"),
3448 ("warehouse.path", "s3://my-bucket/warehouse"),
3449 ("catalog.security", "invalid_security_type"),
3450 ("database.name", "test_db"),
3451 ("table.name", "test_table"),
3452 ]
3453 .into_iter()
3454 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3455 .collect();
3456
3457 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3459
3460 assert_eq!(
3462 iceberg_config.common.catalog_security.as_deref(),
3463 Some("invalid_security_type")
3464 );
3465
3466 assert_eq!(iceberg_config.catalog_type(), "rest");
3468 }
3469
3470 #[test]
3472 fn test_parse_custom_io_impl_config() {
3473 let values: BTreeMap<String, String> = [
3474 ("connector", "iceberg"),
3475 ("type", "append-only"),
3476 ("force_append_only", "true"),
3477 ("catalog.name", "gcs-catalog"),
3478 ("catalog.type", "rest"),
3479 ("catalog.uri", "https://example.com/iceberg/rest"),
3480 ("warehouse.path", "gs://my-bucket/warehouse"),
3481 ("catalog.security", "google"),
3482 ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3483 ("database.name", "test_db"),
3484 ("table.name", "test_table"),
3485 ]
3486 .into_iter()
3487 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3488 .collect();
3489
3490 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3491
3492 assert_eq!(iceberg_config.catalog_type(), "rest");
3494
3495 assert_eq!(
3497 iceberg_config.common.catalog_io_impl.as_deref(),
3498 Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3499 );
3500
3501 assert_eq!(
3503 iceberg_config.common.catalog_security.as_deref(),
3504 Some("google")
3505 );
3506 }
3507
3508 #[test]
3509 fn test_config_constants_consistency() {
3510 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3513 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3514 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3515 assert_eq!(WRITE_MODE, "write_mode");
3516 assert_eq!(
3517 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3518 "snapshot_expiration_retain_last"
3519 );
3520 assert_eq!(
3521 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3522 "snapshot_expiration_max_age_millis"
3523 );
3524 assert_eq!(
3525 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3526 "snapshot_expiration_clear_expired_files"
3527 );
3528 assert_eq!(
3529 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3530 "snapshot_expiration_clear_expired_meta_data"
3531 );
3532 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3533 }
3534
3535 #[test]
3536 fn test_parse_iceberg_compaction_config() {
3537 let values = [
3539 ("connector", "iceberg"),
3540 ("type", "upsert"),
3541 ("primary_key", "id"),
3542 ("warehouse.path", "s3://iceberg"),
3543 ("s3.endpoint", "http://127.0.0.1:9301"),
3544 ("s3.access.key", "test"),
3545 ("s3.secret.key", "test"),
3546 ("s3.region", "us-east-1"),
3547 ("catalog.type", "storage"),
3548 ("catalog.name", "demo"),
3549 ("database.name", "test_db"),
3550 ("table.name", "test_table"),
3551 ("enable_compaction", "true"),
3552 ("compaction.max_snapshots_num", "100"),
3553 ("compaction.small_files_threshold_mb", "512"),
3554 ("compaction.delete_files_count_threshold", "50"),
3555 ("compaction.trigger_snapshot_count", "10"),
3556 ("compaction.target_file_size_mb", "256"),
3557 ("compaction.type", "full"),
3558 ]
3559 .into_iter()
3560 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3561 .collect();
3562
3563 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3564
3565 assert!(iceberg_config.enable_compaction);
3567 assert_eq!(
3568 iceberg_config.max_snapshots_num_before_compaction,
3569 Some(100)
3570 );
3571 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3572 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3573 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3574 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3575 assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3576 }
3577
3578 #[test]
3579 fn test_append_only_rejects_copy_on_write() {
3580 let values = [
3582 ("connector", "iceberg"),
3583 ("type", "append-only"),
3584 ("warehouse.path", "s3://iceberg"),
3585 ("s3.endpoint", "http://127.0.0.1:9301"),
3586 ("s3.access.key", "test"),
3587 ("s3.secret.key", "test"),
3588 ("s3.region", "us-east-1"),
3589 ("catalog.type", "storage"),
3590 ("catalog.name", "demo"),
3591 ("database.name", "test_db"),
3592 ("table.name", "test_table"),
3593 ("write_mode", "copy-on-write"),
3594 ]
3595 .into_iter()
3596 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3597 .collect();
3598
3599 let result = IcebergConfig::from_btreemap(values);
3600 assert!(result.is_err());
3601 assert!(
3602 result
3603 .unwrap_err()
3604 .to_string()
3605 .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3606 );
3607 }
3608
3609 #[test]
3610 fn test_append_only_accepts_merge_on_read() {
3611 let values = [
3613 ("connector", "iceberg"),
3614 ("type", "append-only"),
3615 ("warehouse.path", "s3://iceberg"),
3616 ("s3.endpoint", "http://127.0.0.1:9301"),
3617 ("s3.access.key", "test"),
3618 ("s3.secret.key", "test"),
3619 ("s3.region", "us-east-1"),
3620 ("catalog.type", "storage"),
3621 ("catalog.name", "demo"),
3622 ("database.name", "test_db"),
3623 ("table.name", "test_table"),
3624 ("write_mode", "merge-on-read"),
3625 ]
3626 .into_iter()
3627 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3628 .collect();
3629
3630 let result = IcebergConfig::from_btreemap(values);
3631 assert!(result.is_ok());
3632 let config = result.unwrap();
3633 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3634 }
3635
3636 #[test]
3637 fn test_append_only_defaults_to_merge_on_read() {
3638 let values = [
3640 ("connector", "iceberg"),
3641 ("type", "append-only"),
3642 ("warehouse.path", "s3://iceberg"),
3643 ("s3.endpoint", "http://127.0.0.1:9301"),
3644 ("s3.access.key", "test"),
3645 ("s3.secret.key", "test"),
3646 ("s3.region", "us-east-1"),
3647 ("catalog.type", "storage"),
3648 ("catalog.name", "demo"),
3649 ("database.name", "test_db"),
3650 ("table.name", "test_table"),
3651 ]
3652 .into_iter()
3653 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3654 .collect();
3655
3656 let result = IcebergConfig::from_btreemap(values);
3657 assert!(result.is_ok());
3658 let config = result.unwrap();
3659 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3660 }
3661
3662 #[test]
3663 fn test_upsert_accepts_copy_on_write() {
3664 let values = [
3666 ("connector", "iceberg"),
3667 ("type", "upsert"),
3668 ("primary_key", "id"),
3669 ("warehouse.path", "s3://iceberg"),
3670 ("s3.endpoint", "http://127.0.0.1:9301"),
3671 ("s3.access.key", "test"),
3672 ("s3.secret.key", "test"),
3673 ("s3.region", "us-east-1"),
3674 ("catalog.type", "storage"),
3675 ("catalog.name", "demo"),
3676 ("database.name", "test_db"),
3677 ("table.name", "test_table"),
3678 ("write_mode", "copy-on-write"),
3679 ]
3680 .into_iter()
3681 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3682 .collect();
3683
3684 let result = IcebergConfig::from_btreemap(values);
3685 assert!(result.is_ok());
3686 let config = result.unwrap();
3687 assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3688 }
3689}