1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31 SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32 UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38 DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45 PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50 DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64 Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94 SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103 TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121 #[default]
122 MergeOnRead,
123 CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127 pub fn as_str(self) -> &'static str {
128 match self {
129 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131 }
132 }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136 type Err = SinkError;
137
138 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139 match s {
140 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142 _ => Err(SinkError::Config(anyhow!(format!(
143 "invalid write_mode: {}, must be one of: {}, {}",
144 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145 )))),
146 }
147 }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151 type Error = <Self as std::str::FromStr>::Err;
152
153 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154 value.parse()
155 }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159 type Error = <Self as std::str::FromStr>::Err;
160
161 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162 value.as_str().parse()
163 }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.write_str(self.as_str())
169 }
170}
171
172pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182 "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197 "compaction.write_parquet_max_row_group_rows";
198
199const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
200
201fn default_commit_retry_num() -> u32 {
202 8
203}
204
205fn default_iceberg_write_mode() -> IcebergWriteMode {
206 IcebergWriteMode::MergeOnRead
207}
208
209fn default_iceberg_format_version() -> FormatVersion {
210 FormatVersion::V2
211}
212
213fn default_true() -> bool {
214 true
215}
216
217fn default_some_true() -> Option<bool> {
218 Some(true)
219}
220
221fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
222 let parsed = value
223 .trim()
224 .parse::<u8>()
225 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
226 match parsed {
227 1 => Ok(FormatVersion::V1),
228 2 => Ok(FormatVersion::V2),
229 3 => Ok(FormatVersion::V3),
230 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
231 }
232}
233
234fn deserialize_format_version<'de, D>(
235 deserializer: D,
236) -> std::result::Result<FormatVersion, D::Error>
237where
238 D: Deserializer<'de>,
239{
240 struct FormatVersionVisitor;
241
242 impl<'de> Visitor<'de> for FormatVersionVisitor {
243 type Value = FormatVersion;
244
245 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 formatter.write_str("format-version as 1, 2, or 3")
247 }
248
249 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
250 where
251 E: de::Error,
252 {
253 let value = u8::try_from(value)
254 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
255 parse_format_version_str(&value.to_string()).map_err(E::custom)
256 }
257
258 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
259 where
260 E: de::Error,
261 {
262 let value = u8::try_from(value)
263 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
264 parse_format_version_str(&value.to_string()).map_err(E::custom)
265 }
266
267 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
268 where
269 E: de::Error,
270 {
271 parse_format_version_str(value).map_err(E::custom)
272 }
273
274 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
275 where
276 E: de::Error,
277 {
278 self.visit_str(&value)
279 }
280 }
281
282 deserializer.deserialize_any(FormatVersionVisitor)
283}
284
285#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
287#[serde(rename_all = "kebab-case")]
288pub enum CompactionType {
289 #[default]
291 Full,
292 SmallFiles,
294 FilesWithDelete,
296}
297
298impl CompactionType {
299 pub fn as_str(&self) -> &'static str {
300 match self {
301 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
302 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
303 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
304 }
305 }
306}
307
308impl std::str::FromStr for CompactionType {
309 type Err = SinkError;
310
311 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
312 match s {
313 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
314 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
315 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
316 _ => Err(SinkError::Config(anyhow!(format!(
317 "invalid compaction_type: {}, must be one of: {}, {}, {}",
318 s,
319 ICEBERG_COMPACTION_TYPE_FULL,
320 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
321 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
322 )))),
323 }
324 }
325}
326
327impl TryFrom<&str> for CompactionType {
328 type Error = <Self as std::str::FromStr>::Err;
329
330 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
331 value.parse()
332 }
333}
334
335impl TryFrom<String> for CompactionType {
336 type Error = <Self as std::str::FromStr>::Err;
337
338 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
339 value.as_str().parse()
340 }
341}
342
343impl std::fmt::Display for CompactionType {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 write!(f, "{}", self.as_str())
346 }
347}
348
349#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
351pub struct IcebergConfig {
352 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
355 pub force_append_only: bool,
356
357 #[serde(flatten)]
358 common: IcebergCommon,
359
360 #[serde(flatten)]
361 table: IcebergTableIdentifier,
362
363 #[serde(
364 rename = "primary_key",
365 default,
366 deserialize_with = "deserialize_optional_string_seq_from_string"
367 )]
368 pub primary_key: Option<Vec<String>>,
369
370 #[serde(skip)]
372 pub java_catalog_props: HashMap<String, String>,
373
374 #[serde(default)]
375 pub partition_by: Option<String>,
376
377 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
379 #[serde_as(as = "DisplayFromStr")]
380 #[with_option(allow_alter_on_fly)]
381 pub commit_checkpoint_interval: u64,
382
383 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
384 pub create_table_if_not_exists: bool,
385
386 #[serde(default = "default_some_true")]
388 #[serde_as(as = "Option<DisplayFromStr>")]
389 pub is_exactly_once: Option<bool>,
390 #[serde(default = "default_commit_retry_num")]
395 pub commit_retry_num: u32,
396
397 #[serde(
399 rename = "enable_compaction",
400 default,
401 deserialize_with = "deserialize_bool_from_string"
402 )]
403 #[with_option(allow_alter_on_fly)]
404 pub enable_compaction: bool,
405
406 #[serde(rename = "compaction_interval_sec", default)]
408 #[serde_as(as = "Option<DisplayFromStr>")]
409 #[with_option(allow_alter_on_fly)]
410 pub compaction_interval_sec: Option<u64>,
411
412 #[serde(
414 rename = "enable_snapshot_expiration",
415 default,
416 deserialize_with = "deserialize_bool_from_string"
417 )]
418 #[with_option(allow_alter_on_fly)]
419 pub enable_snapshot_expiration: bool,
420
421 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
423 pub write_mode: IcebergWriteMode,
424
425 #[serde(
427 rename = "format_version",
428 default = "default_iceberg_format_version",
429 deserialize_with = "deserialize_format_version"
430 )]
431 pub format_version: FormatVersion,
432
433 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
436 #[serde_as(as = "Option<DisplayFromStr>")]
437 #[with_option(allow_alter_on_fly)]
438 pub snapshot_expiration_max_age_millis: Option<i64>,
439
440 #[serde(rename = "snapshot_expiration_retain_last", default)]
442 #[serde_as(as = "Option<DisplayFromStr>")]
443 #[with_option(allow_alter_on_fly)]
444 pub snapshot_expiration_retain_last: Option<i32>,
445
446 #[serde(
447 rename = "snapshot_expiration_clear_expired_files",
448 default = "default_true",
449 deserialize_with = "deserialize_bool_from_string"
450 )]
451 #[with_option(allow_alter_on_fly)]
452 pub snapshot_expiration_clear_expired_files: bool,
453
454 #[serde(
455 rename = "snapshot_expiration_clear_expired_meta_data",
456 default = "default_true",
457 deserialize_with = "deserialize_bool_from_string"
458 )]
459 #[with_option(allow_alter_on_fly)]
460 pub snapshot_expiration_clear_expired_meta_data: bool,
461
462 #[serde(rename = "compaction.max_snapshots_num", default)]
465 #[serde_as(as = "Option<DisplayFromStr>")]
466 #[with_option(allow_alter_on_fly)]
467 pub max_snapshots_num_before_compaction: Option<usize>,
468
469 #[serde(rename = "compaction.small_files_threshold_mb", default)]
470 #[serde_as(as = "Option<DisplayFromStr>")]
471 #[with_option(allow_alter_on_fly)]
472 pub small_files_threshold_mb: Option<u64>,
473
474 #[serde(rename = "compaction.delete_files_count_threshold", default)]
475 #[serde_as(as = "Option<DisplayFromStr>")]
476 #[with_option(allow_alter_on_fly)]
477 pub delete_files_count_threshold: Option<usize>,
478
479 #[serde(rename = "compaction.trigger_snapshot_count", default)]
480 #[serde_as(as = "Option<DisplayFromStr>")]
481 #[with_option(allow_alter_on_fly)]
482 pub trigger_snapshot_count: Option<usize>,
483
484 #[serde(rename = "compaction.target_file_size_mb", default)]
485 #[serde_as(as = "Option<DisplayFromStr>")]
486 #[with_option(allow_alter_on_fly)]
487 pub target_file_size_mb: Option<u64>,
488
489 #[serde(rename = "compaction.type", default)]
492 #[with_option(allow_alter_on_fly)]
493 pub compaction_type: Option<CompactionType>,
494
495 #[serde(rename = "compaction.write_parquet_compression", default)]
499 #[with_option(allow_alter_on_fly)]
500 pub write_parquet_compression: Option<String>,
501
502 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
505 #[serde_as(as = "Option<DisplayFromStr>")]
506 #[with_option(allow_alter_on_fly)]
507 pub write_parquet_max_row_group_rows: Option<usize>,
508}
509
510impl EnforceSecret for IcebergConfig {
511 fn enforce_secret<'a>(
512 prop_iter: impl Iterator<Item = &'a str>,
513 ) -> crate::error::ConnectorResult<()> {
514 for prop in prop_iter {
515 IcebergCommon::enforce_one(prop)?;
516 }
517 Ok(())
518 }
519
520 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
521 IcebergCommon::enforce_one(prop)
522 }
523}
524
525impl IcebergConfig {
526 fn validate_append_only_write_mode(
529 sink_type: &str,
530 write_mode: IcebergWriteMode,
531 ) -> Result<()> {
532 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
533 return Err(SinkError::Config(anyhow!(
534 "'copy-on-write' mode is not supported for append-only iceberg sink. \
535 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
536 )));
537 }
538 Ok(())
539 }
540
541 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
542 let mut config =
543 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
544 .map_err(|e| SinkError::Config(anyhow!(e)))?;
545
546 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
547 return Err(SinkError::Config(anyhow!(
548 "`{}` must be {}, or {}",
549 SINK_TYPE_OPTION,
550 SINK_TYPE_APPEND_ONLY,
551 SINK_TYPE_UPSERT
552 )));
553 }
554
555 if config.r#type == SINK_TYPE_UPSERT {
556 if let Some(primary_key) = &config.primary_key {
557 if primary_key.is_empty() {
558 return Err(SinkError::Config(anyhow!(
559 "`primary-key` must not be empty in {}",
560 SINK_TYPE_UPSERT
561 )));
562 }
563 } else {
564 return Err(SinkError::Config(anyhow!(
565 "Must set `primary-key` in {}",
566 SINK_TYPE_UPSERT
567 )));
568 }
569 }
570
571 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
573
574 config.java_catalog_props = values
576 .iter()
577 .filter(|(k, _v)| {
578 k.starts_with("catalog.")
579 && k != &"catalog.uri"
580 && k != &"catalog.type"
581 && k != &"catalog.name"
582 && k != &"catalog.header"
583 })
584 .map(|(k, v)| (k[8..].to_string(), v.clone()))
585 .collect();
586
587 if config.commit_checkpoint_interval == 0 {
588 return Err(SinkError::Config(anyhow!(
589 "`commit-checkpoint-interval` must be greater than 0"
590 )));
591 }
592
593 Ok(config)
594 }
595
596 pub fn catalog_type(&self) -> &str {
597 self.common.catalog_type()
598 }
599
600 pub async fn load_table(&self) -> Result<Table> {
601 self.common
602 .load_table(&self.table, &self.java_catalog_props)
603 .await
604 .map_err(Into::into)
605 }
606
607 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
608 self.common
609 .create_catalog(&self.java_catalog_props)
610 .await
611 .map_err(Into::into)
612 }
613
614 pub fn full_table_name(&self) -> Result<TableIdent> {
615 self.table.to_table_ident().map_err(Into::into)
616 }
617
618 pub fn catalog_name(&self) -> String {
619 self.common.catalog_name()
620 }
621
622 pub fn table_format_version(&self) -> FormatVersion {
623 self.format_version
624 }
625
626 pub fn compaction_interval_sec(&self) -> u64 {
627 self.compaction_interval_sec.unwrap_or(3600)
629 }
630
631 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
634 self.snapshot_expiration_max_age_millis
635 .map(|max_age_millis| current_time_ms - max_age_millis)
636 }
637
638 pub fn trigger_snapshot_count(&self) -> usize {
639 self.trigger_snapshot_count.unwrap_or(usize::MAX)
640 }
641
642 pub fn small_files_threshold_mb(&self) -> u64 {
643 self.small_files_threshold_mb.unwrap_or(64)
644 }
645
646 pub fn delete_files_count_threshold(&self) -> usize {
647 self.delete_files_count_threshold.unwrap_or(256)
648 }
649
650 pub fn target_file_size_mb(&self) -> u64 {
651 self.target_file_size_mb.unwrap_or(1024)
652 }
653
654 pub fn compaction_type(&self) -> CompactionType {
657 self.compaction_type.unwrap_or_default()
658 }
659
660 pub fn write_parquet_compression(&self) -> &str {
663 self.write_parquet_compression.as_deref().unwrap_or("zstd")
664 }
665
666 pub fn write_parquet_max_row_group_rows(&self) -> usize {
669 self.write_parquet_max_row_group_rows.unwrap_or(122880)
670 }
671
672 pub fn get_parquet_compression(&self) -> Compression {
675 parse_parquet_compression(self.write_parquet_compression())
676 }
677}
678
679fn parse_parquet_compression(codec: &str) -> Compression {
681 match codec.to_lowercase().as_str() {
682 "uncompressed" => Compression::UNCOMPRESSED,
683 "snappy" => Compression::SNAPPY,
684 "gzip" => Compression::GZIP(Default::default()),
685 "lzo" => Compression::LZO,
686 "brotli" => Compression::BROTLI(Default::default()),
687 "lz4" => Compression::LZ4,
688 "zstd" => Compression::ZSTD(Default::default()),
689 _ => {
690 tracing::warn!(
691 "Unknown compression codec '{}', falling back to SNAPPY",
692 codec
693 );
694 Compression::SNAPPY
695 }
696 }
697}
698
699pub struct IcebergSink {
700 pub config: IcebergConfig,
701 param: SinkParam,
702 unique_column_ids: Option<Vec<usize>>,
704}
705
706impl EnforceSecret for IcebergSink {
707 fn enforce_secret<'a>(
708 prop_iter: impl Iterator<Item = &'a str>,
709 ) -> crate::error::ConnectorResult<()> {
710 for prop in prop_iter {
711 IcebergConfig::enforce_one(prop)?;
712 }
713 Ok(())
714 }
715}
716
717impl TryFrom<SinkParam> for IcebergSink {
718 type Error = SinkError;
719
720 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
721 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
722 IcebergSink::new(config, param)
723 }
724}
725
726impl Debug for IcebergSink {
727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 f.debug_struct("IcebergSink")
729 .field("config", &self.config)
730 .finish()
731 }
732}
733
734async fn create_and_validate_table_impl(
735 config: &IcebergConfig,
736 param: &SinkParam,
737) -> Result<Table> {
738 if config.create_table_if_not_exists {
739 create_table_if_not_exists_impl(config, param).await?;
740 }
741
742 let table = config
743 .load_table()
744 .await
745 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
746
747 let sink_schema = param.schema();
748 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
749 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
750
751 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
752 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
753
754 Ok(table)
755}
756
757async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
758 let catalog = config.create_catalog().await?;
759 let namespace = if let Some(database_name) = config.table.database_name() {
760 let namespace = NamespaceIdent::new(database_name.to_owned());
761 if !catalog
762 .namespace_exists(&namespace)
763 .await
764 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
765 {
766 catalog
767 .create_namespace(&namespace, HashMap::default())
768 .await
769 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
770 .context("failed to create iceberg namespace")?;
771 }
772 namespace
773 } else {
774 bail!("database name must be set if you want to create table")
775 };
776
777 let table_id = config
778 .full_table_name()
779 .context("Unable to parse table name")?;
780 if !catalog
781 .table_exists(&table_id)
782 .await
783 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
784 {
785 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
786 let arrow_fields = param
788 .columns
789 .iter()
790 .map(|column| {
791 Ok(iceberg_create_table_arrow_convert
792 .to_arrow_field(&column.name, &column.data_type)
793 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794 .context(format!(
795 "failed to convert {}: {} to arrow type",
796 &column.name, &column.data_type
797 ))?)
798 })
799 .collect::<Result<Vec<ArrowField>>>()?;
800 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
801 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
802 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
803 .context("failed to convert arrow schema to iceberg schema")?;
804
805 let location = {
806 let mut names = namespace.clone().inner();
807 names.push(config.table.table_name().to_owned());
808 match &config.common.warehouse_path {
809 Some(warehouse_path) => {
810 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
811 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
813 let url = Url::parse(warehouse_path);
814 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
815 if config.common.catalog_type() == "rest"
818 || config.common.catalog_type() == "rest_rust"
819 {
820 None
821 } else {
822 bail!(format!("Invalid warehouse path: {}", warehouse_path))
823 }
824 } else if warehouse_path.ends_with('/') {
825 Some(format!("{}{}", warehouse_path, names.join("/")))
826 } else {
827 Some(format!("{}/{}", warehouse_path, names.join("/")))
828 }
829 }
830 None => None,
831 }
832 };
833
834 let partition_spec = match &config.partition_by {
835 Some(partition_by) => {
836 let mut partition_fields = Vec::<UnboundPartitionField>::new();
837 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
838 .into_iter()
839 .enumerate()
840 {
841 match iceberg_schema.field_id_by_name(&column) {
842 Some(id) => partition_fields.push(
843 UnboundPartitionField::builder()
844 .source_id(id)
845 .transform(transform)
846 .name(format!("_p_{}", column))
847 .field_id(PARTITION_DATA_ID_START + i as i32)
848 .build(),
849 ),
850 None => bail!(format!(
851 "Partition source column does not exist in schema: {}",
852 column
853 )),
854 };
855 }
856 Some(
857 UnboundPartitionSpec::builder()
858 .with_spec_id(0)
859 .add_partition_fields(partition_fields)
860 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
861 .context("failed to add partition columns")?
862 .build(),
863 )
864 }
865 None => None,
866 };
867
868 let properties = HashMap::from([(
870 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
871 (config.format_version as u8).to_string(),
872 )]);
873
874 let table_creation_builder = TableCreation::builder()
875 .name(config.table.table_name().to_owned())
876 .schema(iceberg_schema)
877 .format_version(config.table_format_version())
878 .properties(properties);
879
880 let table_creation = match (location, partition_spec) {
881 (Some(location), Some(partition_spec)) => table_creation_builder
882 .location(location)
883 .partition_spec(partition_spec)
884 .build(),
885 (Some(location), None) => table_creation_builder.location(location).build(),
886 (None, Some(partition_spec)) => table_creation_builder
887 .partition_spec(partition_spec)
888 .build(),
889 (None, None) => table_creation_builder.build(),
890 };
891
892 catalog
893 .create_table(&namespace, table_creation)
894 .await
895 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
896 .context("failed to create iceberg table")?;
897 }
898 Ok(())
899}
900
901impl IcebergSink {
902 pub async fn create_and_validate_table(&self) -> Result<Table> {
903 create_and_validate_table_impl(&self.config, &self.param).await
904 }
905
906 pub async fn create_table_if_not_exists(&self) -> Result<()> {
907 create_table_if_not_exists_impl(&self.config, &self.param).await
908 }
909
910 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
911 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
912 if let Some(pk) = &config.primary_key {
913 let mut unique_column_ids = Vec::with_capacity(pk.len());
914 for col_name in pk {
915 let id = param
916 .columns
917 .iter()
918 .find(|col| col.name.as_str() == col_name)
919 .ok_or_else(|| {
920 SinkError::Config(anyhow!(
921 "Primary key column {} not found in sink schema",
922 col_name
923 ))
924 })?
925 .column_id
926 .get_id() as usize;
927 unique_column_ids.push(id);
928 }
929 Some(unique_column_ids)
930 } else {
931 unreachable!()
932 }
933 } else {
934 None
935 };
936 Ok(Self {
937 config,
938 param,
939 unique_column_ids,
940 })
941 }
942}
943
944impl Sink for IcebergSink {
945 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
946
947 const SINK_NAME: &'static str = ICEBERG_SINK;
948
949 async fn validate(&self) -> Result<()> {
950 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
951 bail!("Snowflake catalog only supports iceberg sources");
952 }
953
954 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
955 risingwave_common::license::Feature::IcebergSinkWithGlue
956 .check_available()
957 .map_err(|e| anyhow::anyhow!(e))?;
958 }
959
960 IcebergConfig::validate_append_only_write_mode(
962 &self.config.r#type,
963 self.config.write_mode,
964 )?;
965
966 let compaction_type = self.config.compaction_type();
968
969 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
972 && compaction_type != CompactionType::Full
973 {
974 bail!(
975 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
976 compaction_type
977 );
978 }
979
980 match compaction_type {
981 CompactionType::SmallFiles => {
982 risingwave_common::license::Feature::IcebergCompaction
984 .check_available()
985 .map_err(|e| anyhow::anyhow!(e))?;
986
987 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
989 bail!(
990 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
991 self.config.write_mode
992 );
993 }
994
995 if self.config.delete_files_count_threshold.is_some() {
997 bail!(
998 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
999 );
1000 }
1001 }
1002 CompactionType::FilesWithDelete => {
1003 risingwave_common::license::Feature::IcebergCompaction
1005 .check_available()
1006 .map_err(|e| anyhow::anyhow!(e))?;
1007
1008 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1010 bail!(
1011 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1012 self.config.write_mode
1013 );
1014 }
1015
1016 if self.config.small_files_threshold_mb.is_some() {
1018 bail!(
1019 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1020 );
1021 }
1022 }
1023 CompactionType::Full => {
1024 }
1026 }
1027
1028 let _ = self.create_and_validate_table().await?;
1029 Ok(())
1030 }
1031
1032 fn support_schema_change() -> bool {
1033 true
1034 }
1035
1036 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1037 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1038
1039 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1041 if iceberg_config.enable_compaction && compaction_interval == 0 {
1042 bail!(
1043 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1044 );
1045 }
1046
1047 tracing::info!(
1049 "Alter config compaction_interval set to {} seconds",
1050 compaction_interval
1051 );
1052 }
1053
1054 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1056 && max_snapshots < 1
1057 {
1058 bail!(
1059 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1060 max_snapshots
1061 );
1062 }
1063
1064 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1066 && target_file_size_mb == 0
1067 {
1068 bail!("`compaction.target_file_size_mb` must be greater than 0");
1069 }
1070
1071 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1073 && max_row_group_rows == 0
1074 {
1075 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1076 }
1077
1078 if let Some(ref compression) = iceberg_config.write_parquet_compression {
1080 let valid_codecs = [
1081 "uncompressed",
1082 "snappy",
1083 "gzip",
1084 "lzo",
1085 "brotli",
1086 "lz4",
1087 "zstd",
1088 ];
1089 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1090 bail!(
1091 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1092 valid_codecs,
1093 compression
1094 );
1095 }
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1102 let writer = IcebergSinkWriter::new(
1103 self.config.clone(),
1104 self.param.clone(),
1105 writer_param.clone(),
1106 self.unique_column_ids.clone(),
1107 );
1108
1109 let commit_checkpoint_interval =
1110 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1111 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1112 );
1113 let log_sinker = CoordinatedLogSinker::new(
1114 &writer_param,
1115 self.param.clone(),
1116 writer,
1117 commit_checkpoint_interval,
1118 )
1119 .await?;
1120
1121 Ok(log_sinker)
1122 }
1123
1124 fn is_coordinated_sink(&self) -> bool {
1125 true
1126 }
1127
1128 async fn new_coordinator(
1129 &self,
1130 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1131 ) -> Result<SinkCommitCoordinator> {
1132 let catalog = self.config.create_catalog().await?;
1133 let table = self.create_and_validate_table().await?;
1134 let coordinator = IcebergSinkCommitter {
1135 catalog,
1136 table,
1137 last_commit_epoch: 0,
1138 sink_id: self.param.sink_id,
1139 config: self.config.clone(),
1140 param: self.param.clone(),
1141 commit_retry_num: self.config.commit_retry_num,
1142 iceberg_compact_stat_sender,
1143 };
1144 if self.config.is_exactly_once.unwrap_or_default() {
1145 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1146 } else {
1147 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1148 }
1149 }
1150}
1151
1152enum ProjectIdxVec {
1159 None,
1160 Prepare(usize),
1161 Done(Vec<usize>),
1162}
1163
1164type DataFileWriterBuilderType =
1165 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1166type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1167 ParquetWriterBuilder,
1168 DefaultLocationGenerator,
1169 DefaultFileNameGenerator,
1170>;
1171type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1172 ParquetWriterBuilder,
1173 DefaultLocationGenerator,
1174 DefaultFileNameGenerator,
1175>;
1176type DeletionVectorWriterBuilderType =
1177 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1178type DeletionVectorWriterType =
1179 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1180type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1181 ParquetWriterBuilder,
1182 DefaultLocationGenerator,
1183 DefaultFileNameGenerator,
1184>;
1185
1186#[derive(Clone)]
1187enum PositionDeleteWriterBuilderType {
1188 PositionDelete(PositionDeleteFileWriterBuilderType),
1189 DeletionVector(DeletionVectorWriterBuilderType),
1190}
1191
1192enum PositionDeleteWriterType {
1193 PositionDelete(PositionDeleteFileWriterType),
1194 DeletionVector(DeletionVectorWriterType),
1195}
1196
1197#[async_trait]
1198impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1199 type R = PositionDeleteWriterType;
1200
1201 async fn build(
1202 self,
1203 partition_key: Option<iceberg::spec::PartitionKey>,
1204 ) -> iceberg::Result<Self::R> {
1205 match self {
1206 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1207 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1208 ),
1209 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1210 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1211 ),
1212 }
1213 }
1214}
1215
1216#[async_trait]
1217impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1218 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1219 match self {
1220 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1221 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1222 }
1223 }
1224
1225 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1226 match self {
1227 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1228 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1229 }
1230 }
1231}
1232
1233#[derive(Clone)]
1234struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1235 inner: B,
1236 fanout_enabled: bool,
1237 schema: IcebergSchemaRef,
1238 partition_spec: PartitionSpecRef,
1239 compute_partition: bool,
1240}
1241
1242impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1243 fn new(
1244 inner: B,
1245 fanout_enabled: bool,
1246 schema: IcebergSchemaRef,
1247 partition_spec: PartitionSpecRef,
1248 compute_partition: bool,
1249 ) -> Self {
1250 Self {
1251 inner,
1252 fanout_enabled,
1253 schema,
1254 partition_spec,
1255 compute_partition,
1256 }
1257 }
1258
1259 fn build(self) -> iceberg::Result<TaskWriter<B>> {
1260 let partition_splitter = match (
1261 self.partition_spec.is_unpartitioned(),
1262 self.compute_partition,
1263 ) {
1264 (true, _) => None,
1265 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
1266 self.schema.clone(),
1267 self.partition_spec.clone(),
1268 )?),
1269 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1270 self.schema.clone(),
1271 self.partition_spec.clone(),
1272 )?),
1273 };
1274
1275 Ok(TaskWriter::new_with_partition_splitter(
1276 self.inner,
1277 self.fanout_enabled,
1278 self.schema,
1279 self.partition_spec,
1280 partition_splitter,
1281 ))
1282 }
1283}
1284
1285pub enum IcebergSinkWriter {
1286 Created(IcebergSinkWriterArgs),
1287 Initialized(IcebergSinkWriterInner),
1288}
1289
1290pub struct IcebergSinkWriterArgs {
1291 config: IcebergConfig,
1292 sink_param: SinkParam,
1293 writer_param: SinkWriterParam,
1294 unique_column_ids: Option<Vec<usize>>,
1295}
1296
1297pub struct IcebergSinkWriterInner {
1298 writer: IcebergWriterDispatch,
1299 arrow_schema: SchemaRef,
1300 metrics: IcebergWriterMetrics,
1302 table: Table,
1304 project_idx_vec: ProjectIdxVec,
1307}
1308
1309#[allow(clippy::type_complexity)]
1310enum IcebergWriterDispatch {
1311 Append {
1312 writer: Option<Box<dyn IcebergWriter>>,
1313 writer_builder:
1314 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1315 },
1316 Upsert {
1317 writer: Option<Box<dyn IcebergWriter>>,
1318 writer_builder: TaskWriterBuilderWrapper<
1319 MonitoredGeneralWriterBuilder<
1320 DeltaWriterBuilder<
1321 DataFileWriterBuilderType,
1322 PositionDeleteWriterBuilderType,
1323 EqualityDeleteFileWriterBuilderType,
1324 >,
1325 >,
1326 >,
1327 arrow_schema_with_op_column: SchemaRef,
1328 },
1329}
1330
1331impl IcebergWriterDispatch {
1332 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1333 match self {
1334 IcebergWriterDispatch::Append { writer, .. }
1335 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1336 }
1337 }
1338}
1339
1340pub struct IcebergWriterMetrics {
1341 _write_qps: LabelGuardedIntCounter,
1346 _write_latency: LabelGuardedHistogram,
1347 write_bytes: LabelGuardedIntCounter,
1348}
1349
1350impl IcebergSinkWriter {
1351 pub fn new(
1352 config: IcebergConfig,
1353 sink_param: SinkParam,
1354 writer_param: SinkWriterParam,
1355 unique_column_ids: Option<Vec<usize>>,
1356 ) -> Self {
1357 Self::Created(IcebergSinkWriterArgs {
1358 config,
1359 sink_param,
1360 writer_param,
1361 unique_column_ids,
1362 })
1363 }
1364}
1365
1366impl IcebergSinkWriterInner {
1367 fn build_append_only(
1368 config: &IcebergConfig,
1369 table: Table,
1370 writer_param: &SinkWriterParam,
1371 ) -> Result<Self> {
1372 let SinkWriterParam {
1373 extra_partition_col_idx,
1374 actor_id,
1375 sink_id,
1376 sink_name,
1377 ..
1378 } = writer_param;
1379 let metrics_labels = [
1380 &actor_id.to_string(),
1381 &sink_id.to_string(),
1382 sink_name.as_str(),
1383 ];
1384
1385 let write_qps = GLOBAL_SINK_METRICS
1387 .iceberg_write_qps
1388 .with_guarded_label_values(&metrics_labels);
1389 let write_latency = GLOBAL_SINK_METRICS
1390 .iceberg_write_latency
1391 .with_guarded_label_values(&metrics_labels);
1392 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1395 .iceberg_rolling_unflushed_data_file
1396 .with_guarded_label_values(&metrics_labels);
1397 let write_bytes = GLOBAL_SINK_METRICS
1398 .iceberg_write_bytes
1399 .with_guarded_label_values(&metrics_labels);
1400
1401 let schema = table.metadata().current_schema();
1402 let partition_spec = table.metadata().default_partition_spec();
1403 let fanout_enabled = !partition_spec.fields().is_empty();
1404 let unique_uuid_suffix = Uuid::now_v7();
1406
1407 let parquet_writer_properties = WriterProperties::builder()
1408 .set_compression(config.get_parquet_compression())
1409 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1410 .set_created_by(PARQUET_CREATED_BY.to_owned())
1411 .build();
1412
1413 let parquet_writer_builder =
1414 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1415 let rolling_builder = RollingFileWriterBuilder::new(
1416 parquet_writer_builder,
1417 (config.target_file_size_mb() * 1024 * 1024) as usize,
1418 table.file_io().clone(),
1419 DefaultLocationGenerator::new(table.metadata().clone())
1420 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1421 DefaultFileNameGenerator::new(
1422 writer_param.actor_id.to_string(),
1423 Some(unique_uuid_suffix.to_string()),
1424 iceberg::spec::DataFileFormat::Parquet,
1425 ),
1426 );
1427 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1428 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1429 data_file_builder,
1430 write_qps.clone(),
1431 write_latency.clone(),
1432 );
1433 let writer_builder = TaskWriterBuilderWrapper::new(
1434 monitored_builder,
1435 fanout_enabled,
1436 schema.clone(),
1437 partition_spec.clone(),
1438 true,
1439 );
1440 let inner_writer = Some(Box::new(
1441 writer_builder
1442 .clone()
1443 .build()
1444 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1445 ) as Box<dyn IcebergWriter>);
1446 Ok(Self {
1447 arrow_schema: Arc::new(
1448 schema_to_arrow_schema(table.metadata().current_schema())
1449 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1450 ),
1451 metrics: IcebergWriterMetrics {
1452 _write_qps: write_qps,
1453 _write_latency: write_latency,
1454 write_bytes,
1455 },
1456 writer: IcebergWriterDispatch::Append {
1457 writer: inner_writer,
1458 writer_builder,
1459 },
1460 table,
1461 project_idx_vec: {
1462 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1463 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1464 } else {
1465 ProjectIdxVec::None
1466 }
1467 },
1468 })
1469 }
1470
1471 fn build_upsert(
1472 config: &IcebergConfig,
1473 table: Table,
1474 unique_column_ids: Vec<usize>,
1475 writer_param: &SinkWriterParam,
1476 ) -> Result<Self> {
1477 let SinkWriterParam {
1478 extra_partition_col_idx,
1479 actor_id,
1480 sink_id,
1481 sink_name,
1482 ..
1483 } = writer_param;
1484 let metrics_labels = [
1485 &actor_id.to_string(),
1486 &sink_id.to_string(),
1487 sink_name.as_str(),
1488 ];
1489 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1490
1491 let write_qps = GLOBAL_SINK_METRICS
1493 .iceberg_write_qps
1494 .with_guarded_label_values(&metrics_labels);
1495 let write_latency = GLOBAL_SINK_METRICS
1496 .iceberg_write_latency
1497 .with_guarded_label_values(&metrics_labels);
1498 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1501 .iceberg_rolling_unflushed_data_file
1502 .with_guarded_label_values(&metrics_labels);
1503 let write_bytes = GLOBAL_SINK_METRICS
1504 .iceberg_write_bytes
1505 .with_guarded_label_values(&metrics_labels);
1506
1507 let schema = table.metadata().current_schema();
1509 let partition_spec = table.metadata().default_partition_spec();
1510 let fanout_enabled = !partition_spec.fields().is_empty();
1511 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1512
1513 let unique_uuid_suffix = Uuid::now_v7();
1515
1516 let parquet_writer_properties = WriterProperties::builder()
1517 .set_compression(config.get_parquet_compression())
1518 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1519 .set_created_by(PARQUET_CREATED_BY.to_owned())
1520 .build();
1521
1522 let data_file_builder = {
1523 let parquet_writer_builder =
1524 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1525 let rolling_writer_builder = RollingFileWriterBuilder::new(
1526 parquet_writer_builder,
1527 (config.target_file_size_mb() * 1024 * 1024) as usize,
1528 table.file_io().clone(),
1529 DefaultLocationGenerator::new(table.metadata().clone())
1530 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1531 DefaultFileNameGenerator::new(
1532 writer_param.actor_id.to_string(),
1533 Some(unique_uuid_suffix.to_string()),
1534 iceberg::spec::DataFileFormat::Parquet,
1535 ),
1536 );
1537 DataFileWriterBuilder::new(rolling_writer_builder)
1538 };
1539 let position_delete_builder = if use_deletion_vectors {
1540 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1541 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1542 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1543 table.file_io().clone(),
1544 location_generator,
1545 DefaultFileNameGenerator::new(
1546 writer_param.actor_id.to_string(),
1547 Some(format!("delvec-{}", unique_uuid_suffix)),
1548 iceberg::spec::DataFileFormat::Puffin,
1549 ),
1550 ))
1551 } else {
1552 let parquet_writer_builder = ParquetWriterBuilder::new(
1553 parquet_writer_properties.clone(),
1554 POSITION_DELETE_SCHEMA.clone().into(),
1555 );
1556 let rolling_writer_builder = RollingFileWriterBuilder::new(
1557 parquet_writer_builder,
1558 (config.target_file_size_mb() * 1024 * 1024) as usize,
1559 table.file_io().clone(),
1560 DefaultLocationGenerator::new(table.metadata().clone())
1561 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1562 DefaultFileNameGenerator::new(
1563 writer_param.actor_id.to_string(),
1564 Some(format!("pos-del-{}", unique_uuid_suffix)),
1565 iceberg::spec::DataFileFormat::Parquet,
1566 ),
1567 );
1568 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1569 rolling_writer_builder,
1570 ))
1571 };
1572 let equality_delete_builder = {
1573 let eq_del_config = EqualityDeleteWriterConfig::new(
1574 unique_column_ids.clone(),
1575 table.metadata().current_schema().clone(),
1576 )
1577 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1578 let parquet_writer_builder = ParquetWriterBuilder::new(
1579 parquet_writer_properties,
1580 Arc::new(
1581 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1582 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1583 ),
1584 );
1585 let rolling_writer_builder = RollingFileWriterBuilder::new(
1586 parquet_writer_builder,
1587 (config.target_file_size_mb() * 1024 * 1024) as usize,
1588 table.file_io().clone(),
1589 DefaultLocationGenerator::new(table.metadata().clone())
1590 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1591 DefaultFileNameGenerator::new(
1592 writer_param.actor_id.to_string(),
1593 Some(format!("eq-del-{}", unique_uuid_suffix)),
1594 iceberg::spec::DataFileFormat::Parquet,
1595 ),
1596 );
1597
1598 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1599 };
1600 let delta_builder = DeltaWriterBuilder::new(
1601 data_file_builder,
1602 position_delete_builder,
1603 equality_delete_builder,
1604 unique_column_ids,
1605 schema.clone(),
1606 );
1607 let original_arrow_schema = Arc::new(
1608 schema_to_arrow_schema(table.metadata().current_schema())
1609 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1610 );
1611 let schema_with_extra_op_column = {
1612 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1613 new_fields.push(Arc::new(ArrowField::new(
1614 "op".to_owned(),
1615 ArrowDataType::Int32,
1616 false,
1617 )));
1618 Arc::new(ArrowSchema::new(new_fields))
1619 };
1620 let writer_builder = TaskWriterBuilderWrapper::new(
1621 MonitoredGeneralWriterBuilder::new(
1622 delta_builder,
1623 write_qps.clone(),
1624 write_latency.clone(),
1625 ),
1626 fanout_enabled,
1627 schema.clone(),
1628 partition_spec.clone(),
1629 true,
1630 );
1631 let inner_writer = Some(Box::new(
1632 writer_builder
1633 .clone()
1634 .build()
1635 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1636 ) as Box<dyn IcebergWriter>);
1637 Ok(Self {
1638 arrow_schema: original_arrow_schema,
1639 metrics: IcebergWriterMetrics {
1640 _write_qps: write_qps,
1641 _write_latency: write_latency,
1642 write_bytes,
1643 },
1644 table,
1645 writer: IcebergWriterDispatch::Upsert {
1646 writer: inner_writer,
1647 writer_builder,
1648 arrow_schema_with_op_column: schema_with_extra_op_column,
1649 },
1650 project_idx_vec: {
1651 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1652 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1653 } else {
1654 ProjectIdxVec::None
1655 }
1656 },
1657 })
1658 }
1659}
1660
1661#[async_trait]
1662impl SinkWriter for IcebergSinkWriter {
1663 type CommitMetadata = Option<SinkMetadata>;
1664
1665 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1667 let Self::Created(args) = self else {
1668 return Ok(());
1669 };
1670
1671 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1672 let inner = match &args.unique_column_ids {
1673 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1674 &args.config,
1675 table,
1676 unique_column_ids.clone(),
1677 &args.writer_param,
1678 )?,
1679 None => {
1680 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1681 }
1682 };
1683
1684 *self = IcebergSinkWriter::Initialized(inner);
1685 Ok(())
1686 }
1687
1688 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1690 let Self::Initialized(inner) = self else {
1691 unreachable!("IcebergSinkWriter should be initialized before barrier");
1692 };
1693
1694 match &mut inner.writer {
1696 IcebergWriterDispatch::Append {
1697 writer,
1698 writer_builder,
1699 } => {
1700 if writer.is_none() {
1701 *writer = Some(Box::new(
1702 writer_builder
1703 .clone()
1704 .build()
1705 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1706 ));
1707 }
1708 }
1709 IcebergWriterDispatch::Upsert {
1710 writer,
1711 writer_builder,
1712 ..
1713 } => {
1714 if writer.is_none() {
1715 *writer = Some(Box::new(
1716 writer_builder
1717 .clone()
1718 .build()
1719 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1720 ));
1721 }
1722 }
1723 };
1724
1725 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1727 match &mut inner.project_idx_vec {
1728 ProjectIdxVec::None => {}
1729 ProjectIdxVec::Prepare(idx) => {
1730 if *idx >= chunk.columns().len() {
1731 return Err(SinkError::Iceberg(anyhow!(
1732 "invalid extra partition column index {}",
1733 idx
1734 )));
1735 }
1736 let project_idx_vec = (0..*idx)
1737 .chain(*idx + 1..chunk.columns().len())
1738 .collect_vec();
1739 chunk = chunk.project(&project_idx_vec);
1740 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1741 }
1742 ProjectIdxVec::Done(idx_vec) => {
1743 chunk = chunk.project(idx_vec);
1744 }
1745 }
1746 if ops.is_empty() {
1747 return Ok(());
1748 }
1749 let write_batch_size = chunk.estimated_heap_size();
1750 let batch = match &inner.writer {
1751 IcebergWriterDispatch::Append { .. } => {
1752 let filters =
1754 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1755 chunk.set_visibility(filters);
1756 IcebergArrowConvert
1757 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1758 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1759 }
1760 IcebergWriterDispatch::Upsert {
1761 arrow_schema_with_op_column,
1762 ..
1763 } => {
1764 let chunk = IcebergArrowConvert
1765 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1766 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1767 let ops = Arc::new(Int32Array::from(
1768 ops.iter()
1769 .map(|op| match op {
1770 Op::UpdateInsert | Op::Insert => INSERT_OP,
1771 Op::UpdateDelete | Op::Delete => DELETE_OP,
1772 })
1773 .collect_vec(),
1774 ));
1775 let mut columns = chunk.columns().to_vec();
1776 columns.push(ops);
1777 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1778 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1779 }
1780 };
1781
1782 let writer = inner.writer.get_writer().unwrap();
1783 writer
1784 .write(batch)
1785 .instrument_await("iceberg_write")
1786 .await
1787 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1788 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1789 Ok(())
1790 }
1791
1792 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1795 let Self::Initialized(inner) = self else {
1796 unreachable!("IcebergSinkWriter should be initialized before barrier");
1797 };
1798
1799 if !is_checkpoint {
1801 return Ok(None);
1802 }
1803
1804 let close_result = match &mut inner.writer {
1805 IcebergWriterDispatch::Append {
1806 writer,
1807 writer_builder,
1808 } => {
1809 let close_result = match writer.take() {
1810 Some(mut writer) => {
1811 Some(writer.close().instrument_await("iceberg_close").await)
1812 }
1813 _ => None,
1814 };
1815 match writer_builder.clone().build() {
1816 Ok(new_writer) => {
1817 *writer = Some(Box::new(new_writer));
1818 }
1819 _ => {
1820 warn!("Failed to build new writer after close");
1823 }
1824 }
1825 close_result
1826 }
1827 IcebergWriterDispatch::Upsert {
1828 writer,
1829 writer_builder,
1830 ..
1831 } => {
1832 let close_result = match writer.take() {
1833 Some(mut writer) => {
1834 Some(writer.close().instrument_await("iceberg_close").await)
1835 }
1836 _ => None,
1837 };
1838 match writer_builder.clone().build() {
1839 Ok(new_writer) => {
1840 *writer = Some(Box::new(new_writer));
1841 }
1842 _ => {
1843 warn!("Failed to build new writer after close");
1846 }
1847 }
1848 close_result
1849 }
1850 };
1851
1852 match close_result {
1853 Some(Ok(result)) => {
1854 let format_version = inner.table.metadata().format_version();
1855 let partition_type = inner.table.metadata().default_partition_type();
1856 let data_files = result
1857 .into_iter()
1858 .map(|f| {
1859 let truncated = truncate_datafile(f);
1861 SerializedDataFile::try_from(truncated, partition_type, format_version)
1862 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1863 })
1864 .collect::<Result<Vec<_>>>()?;
1865 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1866 data_files,
1867 schema_id: inner.table.metadata().current_schema_id(),
1868 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1869 })?))
1870 }
1871 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1872 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1873 }
1874 }
1875}
1876
1877const SCHEMA_ID: &str = "schema_id";
1878const PARTITION_SPEC_ID: &str = "partition_spec_id";
1879const DATA_FILES: &str = "data_files";
1880
1881const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1905 data_file.lower_bounds.retain(|field_id, datum| {
1907 let size = match datum.to_bytes() {
1909 Ok(bytes) => bytes.len(),
1910 Err(_) => 0,
1911 };
1912
1913 if size > MAX_COLUMN_STAT_SIZE {
1914 tracing::debug!(
1915 field_id = field_id,
1916 size = size,
1917 "Truncating large lower_bound statistic"
1918 );
1919 return false;
1920 }
1921 true
1922 });
1923
1924 data_file.upper_bounds.retain(|field_id, datum| {
1926 let size = match datum.to_bytes() {
1928 Ok(bytes) => bytes.len(),
1929 Err(_) => 0,
1930 };
1931
1932 if size > MAX_COLUMN_STAT_SIZE {
1933 tracing::debug!(
1934 field_id = field_id,
1935 size = size,
1936 "Truncating large upper_bound statistic"
1937 );
1938 return false;
1939 }
1940 true
1941 });
1942
1943 data_file
1944}
1945
1946#[derive(Default, Clone)]
1947struct IcebergCommitResult {
1948 schema_id: i32,
1949 partition_spec_id: i32,
1950 data_files: Vec<SerializedDataFile>,
1951}
1952
1953impl IcebergCommitResult {
1954 fn try_from(value: &SinkMetadata) -> Result<Self> {
1955 if let Some(Serialized(v)) = &value.metadata {
1956 let mut values = if let serde_json::Value::Object(v) =
1957 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1958 .context("Can't parse iceberg sink metadata")?
1959 {
1960 v
1961 } else {
1962 bail!("iceberg sink metadata should be an object");
1963 };
1964
1965 let schema_id;
1966 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1967 schema_id = value
1968 .as_u64()
1969 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1970 } else {
1971 bail!("iceberg sink metadata should have schema_id");
1972 }
1973
1974 let partition_spec_id;
1975 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1976 partition_spec_id = value
1977 .as_u64()
1978 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1979 } else {
1980 bail!("iceberg sink metadata should have partition_spec_id");
1981 }
1982
1983 let data_files: Vec<SerializedDataFile>;
1984 if let serde_json::Value::Array(values) = values
1985 .remove(DATA_FILES)
1986 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1987 {
1988 data_files = values
1989 .into_iter()
1990 .map(from_value::<SerializedDataFile>)
1991 .collect::<std::result::Result<_, _>>()
1992 .unwrap();
1993 } else {
1994 bail!("iceberg sink metadata should have data_files object");
1995 }
1996
1997 Ok(Self {
1998 schema_id: schema_id as i32,
1999 partition_spec_id: partition_spec_id as i32,
2000 data_files,
2001 })
2002 } else {
2003 bail!("Can't create iceberg sink write result from empty data!")
2004 }
2005 }
2006
2007 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2008 let mut values = if let serde_json::Value::Object(value) =
2009 serde_json::from_slice::<serde_json::Value>(&value)
2010 .context("Can't parse iceberg sink metadata")?
2011 {
2012 value
2013 } else {
2014 bail!("iceberg sink metadata should be an object");
2015 };
2016
2017 let schema_id;
2018 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2019 schema_id = value
2020 .as_u64()
2021 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2022 } else {
2023 bail!("iceberg sink metadata should have schema_id");
2024 }
2025
2026 let partition_spec_id;
2027 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2028 partition_spec_id = value
2029 .as_u64()
2030 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2031 } else {
2032 bail!("iceberg sink metadata should have partition_spec_id");
2033 }
2034
2035 let data_files: Vec<SerializedDataFile>;
2036 if let serde_json::Value::Array(values) = values
2037 .remove(DATA_FILES)
2038 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2039 {
2040 data_files = values
2041 .into_iter()
2042 .map(from_value::<SerializedDataFile>)
2043 .collect::<std::result::Result<_, _>>()
2044 .unwrap();
2045 } else {
2046 bail!("iceberg sink metadata should have data_files object");
2047 }
2048
2049 Ok(Self {
2050 schema_id: schema_id as i32,
2051 partition_spec_id: partition_spec_id as i32,
2052 data_files,
2053 })
2054 }
2055}
2056
2057impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2058 type Error = SinkError;
2059
2060 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2061 let json_data_files = serde_json::Value::Array(
2062 value
2063 .data_files
2064 .iter()
2065 .map(serde_json::to_value)
2066 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2067 .context("Can't serialize data files to json")?,
2068 );
2069 let json_value = serde_json::Value::Object(
2070 vec![
2071 (
2072 SCHEMA_ID.to_owned(),
2073 serde_json::Value::Number(value.schema_id.into()),
2074 ),
2075 (
2076 PARTITION_SPEC_ID.to_owned(),
2077 serde_json::Value::Number(value.partition_spec_id.into()),
2078 ),
2079 (DATA_FILES.to_owned(), json_data_files),
2080 ]
2081 .into_iter()
2082 .collect(),
2083 );
2084 Ok(SinkMetadata {
2085 metadata: Some(Serialized(SerializedMetadata {
2086 metadata: serde_json::to_vec(&json_value)
2087 .context("Can't serialize iceberg sink metadata")?,
2088 })),
2089 })
2090 }
2091}
2092
2093impl TryFrom<IcebergCommitResult> for Vec<u8> {
2094 type Error = SinkError;
2095
2096 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2097 let json_data_files = serde_json::Value::Array(
2098 value
2099 .data_files
2100 .iter()
2101 .map(serde_json::to_value)
2102 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2103 .context("Can't serialize data files to json")?,
2104 );
2105 let json_value = serde_json::Value::Object(
2106 vec![
2107 (
2108 SCHEMA_ID.to_owned(),
2109 serde_json::Value::Number(value.schema_id.into()),
2110 ),
2111 (
2112 PARTITION_SPEC_ID.to_owned(),
2113 serde_json::Value::Number(value.partition_spec_id.into()),
2114 ),
2115 (DATA_FILES.to_owned(), json_data_files),
2116 ]
2117 .into_iter()
2118 .collect(),
2119 );
2120 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2121 }
2122}
2123pub struct IcebergSinkCommitter {
2124 catalog: Arc<dyn Catalog>,
2125 table: Table,
2126 pub last_commit_epoch: u64,
2127 pub(crate) sink_id: SinkId,
2128 pub(crate) config: IcebergConfig,
2129 pub(crate) param: SinkParam,
2130 commit_retry_num: u32,
2131 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2132}
2133
2134impl IcebergSinkCommitter {
2135 async fn reload_table(
2138 catalog: &dyn Catalog,
2139 table_ident: &TableIdent,
2140 schema_id: i32,
2141 partition_spec_id: i32,
2142 ) -> Result<Table> {
2143 let table = catalog
2144 .load_table(table_ident)
2145 .await
2146 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2147 if table.metadata().current_schema_id() != schema_id {
2148 return Err(SinkError::Iceberg(anyhow!(
2149 "Schema evolution not supported, expect schema id {}, but got {}",
2150 schema_id,
2151 table.metadata().current_schema_id()
2152 )));
2153 }
2154 if table.metadata().default_partition_spec_id() != partition_spec_id {
2155 return Err(SinkError::Iceberg(anyhow!(
2156 "Partition evolution not supported, expect partition spec id {}, but got {}",
2157 partition_spec_id,
2158 table.metadata().default_partition_spec_id()
2159 )));
2160 }
2161 Ok(table)
2162 }
2163}
2164
2165#[async_trait]
2166impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2167 async fn init(&mut self) -> Result<()> {
2168 tracing::info!(
2169 sink_id = %self.param.sink_id,
2170 "Iceberg sink coordinator initialized",
2171 );
2172
2173 Ok(())
2174 }
2175
2176 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2177 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2178
2179 if metadata.is_empty() {
2180 tracing::debug!(?epoch, "No datafile to commit");
2181 return Ok(());
2182 }
2183
2184 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2186 self.commit_data_impl(epoch, write_results, snapshot_id)
2187 .await?;
2188 }
2189
2190 Ok(())
2191 }
2192
2193 async fn commit_schema_change(
2194 &mut self,
2195 epoch: u64,
2196 schema_change: PbSinkSchemaChange,
2197 ) -> Result<()> {
2198 tracing::info!(
2199 "Committing schema change {:?} in epoch {}",
2200 schema_change,
2201 epoch
2202 );
2203 self.commit_schema_change_impl(schema_change).await?;
2204 tracing::info!("Successfully committed schema change in epoch {}", epoch);
2205
2206 Ok(())
2207 }
2208}
2209
2210#[async_trait]
2211impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2212 async fn init(&mut self) -> Result<()> {
2213 tracing::info!(
2214 sink_id = %self.param.sink_id,
2215 "Iceberg sink coordinator initialized",
2216 );
2217
2218 Ok(())
2219 }
2220
2221 async fn pre_commit(
2222 &mut self,
2223 epoch: u64,
2224 metadata: Vec<SinkMetadata>,
2225 _schema_change: Option<PbSinkSchemaChange>,
2226 ) -> Result<Option<Vec<u8>>> {
2227 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2228
2229 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2230 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2231 None => {
2232 tracing::debug!(?epoch, "no data to pre commit");
2233 return Ok(None);
2234 }
2235 };
2236
2237 let mut write_results_bytes = Vec::new();
2238 for each_parallelism_write_result in write_results {
2239 let each_parallelism_write_result_bytes: Vec<u8> =
2240 each_parallelism_write_result.try_into()?;
2241 write_results_bytes.push(each_parallelism_write_result_bytes);
2242 }
2243
2244 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2245 write_results_bytes.push(snapshot_id_bytes);
2246
2247 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2248 Ok(Some(pre_commit_metadata_bytes))
2249 }
2250
2251 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2252 tracing::debug!("Starting iceberg commit in epoch {epoch}");
2253
2254 if commit_metadata.is_empty() {
2255 tracing::debug!(?epoch, "No datafile to commit");
2256 return Ok(());
2257 }
2258
2259 let mut payload = deserialize_metadata(commit_metadata);
2261 if payload.is_empty() {
2262 return Err(SinkError::Iceberg(anyhow!(
2263 "Invalid commit metadata: empty payload"
2264 )));
2265 }
2266
2267 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2269 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2270 })?;
2271 let snapshot_id = i64::from_le_bytes(
2272 snapshot_id_bytes
2273 .try_into()
2274 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2275 );
2276
2277 let write_results = payload
2279 .into_iter()
2280 .map(IcebergCommitResult::try_from_serialized_bytes)
2281 .collect::<Result<Vec<_>>>()?;
2282
2283 let snapshot_committed = self
2284 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2285 .await?;
2286
2287 if snapshot_committed {
2288 tracing::info!(
2289 "Snapshot id {} already committed in iceberg table, skip committing again.",
2290 snapshot_id
2291 );
2292 return Ok(());
2293 }
2294
2295 self.commit_data_impl(epoch, write_results, snapshot_id)
2296 .await
2297 }
2298
2299 async fn commit_schema_change(
2300 &mut self,
2301 epoch: u64,
2302 schema_change: PbSinkSchemaChange,
2303 ) -> Result<()> {
2304 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2305 if schema_updated {
2306 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2307 return Ok(());
2308 }
2309
2310 tracing::info!(
2311 "Committing schema change {:?} in epoch {}",
2312 schema_change,
2313 epoch
2314 );
2315 self.commit_schema_change_impl(schema_change).await?;
2316 tracing::info!("Successfully committed schema change in epoch {epoch}");
2317
2318 Ok(())
2319 }
2320
2321 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2322 tracing::debug!("Abort not implemented yet");
2324 }
2325}
2326
2327impl IcebergSinkCommitter {
2329 fn pre_commit_inner(
2330 &mut self,
2331 _epoch: u64,
2332 metadata: Vec<SinkMetadata>,
2333 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2334 let write_results: Vec<IcebergCommitResult> = metadata
2335 .iter()
2336 .map(IcebergCommitResult::try_from)
2337 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2338
2339 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2341 return Ok(None);
2342 }
2343
2344 let expect_schema_id = write_results[0].schema_id;
2345 let expect_partition_spec_id = write_results[0].partition_spec_id;
2346
2347 if write_results
2349 .iter()
2350 .any(|r| r.schema_id != expect_schema_id)
2351 || write_results
2352 .iter()
2353 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2354 {
2355 return Err(SinkError::Iceberg(anyhow!(
2356 "schema_id and partition_spec_id should be the same in all write results"
2357 )));
2358 }
2359
2360 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2361
2362 Ok(Some((write_results, snapshot_id)))
2363 }
2364
2365 async fn commit_data_impl(
2366 &mut self,
2367 epoch: u64,
2368 write_results: Vec<IcebergCommitResult>,
2369 snapshot_id: i64,
2370 ) -> Result<()> {
2371 assert!(
2373 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2374 );
2375
2376 self.wait_for_snapshot_limit().await?;
2378
2379 let expect_schema_id = write_results[0].schema_id;
2380 let expect_partition_spec_id = write_results[0].partition_spec_id;
2381
2382 self.table = Self::reload_table(
2384 self.catalog.as_ref(),
2385 self.table.identifier(),
2386 expect_schema_id,
2387 expect_partition_spec_id,
2388 )
2389 .await?;
2390
2391 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2392 return Err(SinkError::Iceberg(anyhow!(
2393 "Can't find schema by id {}",
2394 expect_schema_id
2395 )));
2396 };
2397 let Some(partition_spec) = self
2398 .table
2399 .metadata()
2400 .partition_spec_by_id(expect_partition_spec_id)
2401 else {
2402 return Err(SinkError::Iceberg(anyhow!(
2403 "Can't find partition spec by id {}",
2404 expect_partition_spec_id
2405 )));
2406 };
2407 let partition_type = partition_spec
2408 .as_ref()
2409 .clone()
2410 .partition_type(schema)
2411 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2412
2413 let data_files = write_results
2414 .into_iter()
2415 .flat_map(|r| {
2416 r.data_files.into_iter().map(|f| {
2417 f.try_into(expect_partition_spec_id, &partition_type, schema)
2418 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2419 })
2420 })
2421 .collect::<Result<Vec<DataFile>>>()?;
2422 let retry_strategy = ExponentialBackoff::from_millis(10)
2427 .max_delay(Duration::from_secs(60))
2428 .map(jitter)
2429 .take(self.commit_retry_num as usize);
2430 let catalog = self.catalog.clone();
2431 let table_ident = self.table.identifier().clone();
2432
2433 enum CommitError {
2438 ReloadTable(SinkError), Commit(SinkError), }
2441
2442 let table = RetryIf::spawn(
2443 retry_strategy,
2444 || async {
2445 let table = Self::reload_table(
2447 catalog.as_ref(),
2448 &table_ident,
2449 expect_schema_id,
2450 expect_partition_spec_id,
2451 )
2452 .await
2453 .map_err(|e| {
2454 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2455 CommitError::ReloadTable(e)
2456 })?;
2457
2458 let txn = Transaction::new(&table);
2459 let append_action = txn
2460 .fast_append()
2461 .set_snapshot_id(snapshot_id)
2462 .set_target_branch(commit_branch(
2463 self.config.r#type.as_str(),
2464 self.config.write_mode,
2465 ))
2466 .add_data_files(data_files.clone());
2467
2468 let tx = append_action.apply(txn).map_err(|err| {
2469 let err: IcebergError = err.into();
2470 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2471 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2472 })?;
2473
2474 tx.commit(catalog.as_ref()).await.map_err(|err| {
2475 let err: IcebergError = err.into();
2476 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2477 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2478 })
2479 },
2480 |err: &CommitError| {
2481 match err {
2483 CommitError::Commit(_) => {
2484 tracing::warn!("Commit failed, will retry");
2485 true
2486 }
2487 CommitError::ReloadTable(_) => {
2488 tracing::error!(
2489 "reload_table failed with non-retriable error, will not retry"
2490 );
2491 false
2492 }
2493 }
2494 },
2495 )
2496 .await
2497 .map_err(|e| match e {
2498 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2499 })?;
2500 self.table = table;
2501
2502 let snapshot_num = self.table.metadata().snapshots().count();
2503 let catalog_name = self.config.common.catalog_name();
2504 let table_name = self.table.identifier().to_string();
2505 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2506 GLOBAL_SINK_METRICS
2507 .iceberg_snapshot_num
2508 .with_guarded_label_values(&metrics_labels)
2509 .set(snapshot_num as i64);
2510
2511 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2512
2513 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2514 && self.config.enable_compaction
2515 && iceberg_compact_stat_sender
2516 .send(IcebergSinkCompactionUpdate {
2517 sink_id: self.sink_id,
2518 compaction_interval: self.config.compaction_interval_sec(),
2519 force_compaction: false,
2520 })
2521 .is_err()
2522 {
2523 warn!("failed to send iceberg compaction stats");
2524 }
2525
2526 Ok(())
2527 }
2528
2529 async fn is_snapshot_id_in_iceberg(
2533 &self,
2534 iceberg_config: &IcebergConfig,
2535 snapshot_id: i64,
2536 ) -> Result<bool> {
2537 let table = iceberg_config.load_table().await?;
2538 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2539 Ok(true)
2540 } else {
2541 Ok(false)
2542 }
2543 }
2544
2545 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2548 let current_schema = self.table.metadata().current_schema();
2549 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2550 .context("Failed to convert schema")
2551 .map_err(SinkError::Iceberg)?;
2552
2553 let iceberg_arrow_convert = IcebergArrowConvert;
2554
2555 let schema_matches = |expected: &[ArrowField]| {
2556 if current_arrow_schema.fields().len() != expected.len() {
2557 return false;
2558 }
2559
2560 expected.iter().all(|expected_field| {
2561 current_arrow_schema.fields().iter().any(|current_field| {
2562 current_field.name() == expected_field.name()
2563 && current_field.data_type() == expected_field.data_type()
2564 })
2565 })
2566 };
2567
2568 let original_arrow_fields: Vec<ArrowField> = schema_change
2569 .original_schema
2570 .iter()
2571 .map(|pb_field| {
2572 let field = Field::from(pb_field);
2573 iceberg_arrow_convert
2574 .to_arrow_field(&field.name, &field.data_type)
2575 .context("Failed to convert field to arrow")
2576 .map_err(SinkError::Iceberg)
2577 })
2578 .collect::<Result<_>>()?;
2579
2580 if schema_matches(&original_arrow_fields) {
2582 tracing::debug!(
2583 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2584 original_arrow_fields.len()
2585 );
2586 return Ok(false);
2587 }
2588
2589 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2591 schema_change.op.as_ref()
2592 else {
2593 return Err(SinkError::Iceberg(anyhow!(
2594 "Unsupported sink schema change op in iceberg sink: {:?}",
2595 schema_change.op
2596 )));
2597 };
2598
2599 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2600 .fields
2601 .iter()
2602 .map(|pb_field| {
2603 let field = Field::from(pb_field);
2604 iceberg_arrow_convert
2605 .to_arrow_field(&field.name, &field.data_type)
2606 .context("Failed to convert field to arrow")
2607 .map_err(SinkError::Iceberg)
2608 })
2609 .collect::<Result<_>>()?;
2610
2611 let mut expected_after_change = original_arrow_fields;
2612 expected_after_change.extend(add_arrow_fields);
2613
2614 if schema_matches(&expected_after_change) {
2616 tracing::debug!(
2617 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2618 expected_after_change.len()
2619 );
2620 return Ok(true);
2621 }
2622
2623 Err(SinkError::Iceberg(anyhow!(
2624 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2625 schema_change.original_schema.len()
2626 )))
2627 }
2628
2629 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2633 use iceberg::spec::NestedField;
2634
2635 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2636 schema_change.op.as_ref()
2637 else {
2638 return Err(SinkError::Iceberg(anyhow!(
2639 "Unsupported sink schema change op in iceberg sink: {:?}",
2640 schema_change.op
2641 )));
2642 };
2643
2644 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2645
2646 let metadata = self.table.metadata();
2648 let mut next_field_id = metadata.last_column_id() + 1;
2649 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2650
2651 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2653 let mut new_fields = Vec::new();
2654
2655 for field in &add_columns {
2656 let arrow_field = iceberg_create_table_arrow_convert
2658 .to_arrow_field(&field.name, &field.data_type)
2659 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2660 .map_err(SinkError::Iceberg)?;
2661
2662 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2664 .map_err(|err| {
2665 SinkError::Iceberg(
2666 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2667 )
2668 })?;
2669
2670 let nested_field = Arc::new(NestedField::optional(
2672 next_field_id,
2673 &field.name,
2674 iceberg_type,
2675 ));
2676
2677 new_fields.push(nested_field);
2678 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2679 next_field_id += 1;
2680 }
2681
2682 tracing::info!(
2684 "Committing schema change to catalog for table {}",
2685 self.table.identifier()
2686 );
2687
2688 let txn = Transaction::new(&self.table);
2689 let action = txn.update_schema().add_fields(new_fields);
2690
2691 let updated_table = action
2692 .apply(txn)
2693 .context("Failed to apply schema update action")
2694 .map_err(SinkError::Iceberg)?
2695 .commit(self.catalog.as_ref())
2696 .await
2697 .context("Failed to commit table schema change")
2698 .map_err(SinkError::Iceberg)?;
2699
2700 self.table = updated_table;
2701
2702 tracing::info!(
2703 "Successfully committed schema change, added {} columns to iceberg table",
2704 add_columns.len()
2705 );
2706
2707 Ok(())
2708 }
2709
2710 fn count_snapshots_since_rewrite(&self) -> usize {
2713 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2714 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2715
2716 let mut count = 0;
2718 for snapshot in snapshots {
2719 let summary = snapshot.summary();
2721 match &summary.operation {
2722 Operation::Replace => {
2723 break;
2725 }
2726
2727 _ => {
2728 count += 1;
2730 }
2731 }
2732 }
2733
2734 count
2735 }
2736
2737 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2739 if !self.config.enable_compaction {
2740 return Ok(());
2741 }
2742
2743 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2744 loop {
2745 let current_count = self.count_snapshots_since_rewrite();
2746
2747 if current_count < max_snapshots {
2748 tracing::info!(
2749 "Snapshot count check passed: {} < {}",
2750 current_count,
2751 max_snapshots
2752 );
2753 break;
2754 }
2755
2756 tracing::info!(
2757 "Snapshot count {} exceeds limit {}, waiting...",
2758 current_count,
2759 max_snapshots
2760 );
2761
2762 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2763 && iceberg_compact_stat_sender
2764 .send(IcebergSinkCompactionUpdate {
2765 sink_id: self.sink_id,
2766 compaction_interval: self.config.compaction_interval_sec(),
2767 force_compaction: true,
2768 })
2769 .is_err()
2770 {
2771 tracing::warn!("failed to send iceberg compaction stats");
2772 }
2773
2774 tokio::time::sleep(Duration::from_secs(30)).await;
2776
2777 self.table = self.config.load_table().await?;
2779 }
2780 }
2781 Ok(())
2782 }
2783}
2784
2785const MAP_KEY: &str = "key";
2786const MAP_VALUE: &str = "value";
2787
2788fn get_fields<'a>(
2789 our_field_type: &'a risingwave_common::types::DataType,
2790 data_type: &ArrowDataType,
2791 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2792) -> Option<ArrowFields> {
2793 match data_type {
2794 ArrowDataType::Struct(fields) => {
2795 match our_field_type {
2796 risingwave_common::types::DataType::Struct(struct_fields) => {
2797 struct_fields.iter().for_each(|(name, data_type)| {
2798 let res = schema_fields.insert(name, data_type);
2799 assert!(res.is_none())
2801 });
2802 }
2803 risingwave_common::types::DataType::Map(map_fields) => {
2804 schema_fields.insert(MAP_KEY, map_fields.key());
2805 schema_fields.insert(MAP_VALUE, map_fields.value());
2806 }
2807 risingwave_common::types::DataType::List(list) => {
2808 list.elem()
2809 .as_struct()
2810 .iter()
2811 .for_each(|(name, data_type)| {
2812 let res = schema_fields.insert(name, data_type);
2813 assert!(res.is_none())
2815 });
2816 }
2817 _ => {}
2818 };
2819 Some(fields.clone())
2820 }
2821 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2822 get_fields(our_field_type, field.data_type(), schema_fields)
2823 }
2824 _ => None, }
2826}
2827
2828fn check_compatibility(
2829 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2830 fields: &ArrowFields,
2831) -> anyhow::Result<bool> {
2832 for arrow_field in fields {
2833 let our_field_type = schema_fields
2834 .get(arrow_field.name().as_str())
2835 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2836
2837 let converted_arrow_data_type = IcebergArrowConvert
2839 .to_arrow_field("", our_field_type)
2840 .map_err(|e| anyhow!(e))?
2841 .data_type()
2842 .clone();
2843
2844 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2845 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2846 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2847 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2848 (ArrowDataType::List(_), ArrowDataType::List(field))
2849 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2850 let mut schema_fields = HashMap::new();
2851 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2852 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2853 }
2854 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2856 let mut schema_fields = HashMap::new();
2857 our_field_type
2858 .as_struct()
2859 .iter()
2860 .for_each(|(name, data_type)| {
2861 let res = schema_fields.insert(name, data_type);
2862 assert!(res.is_none())
2864 });
2865 check_compatibility(schema_fields, fields)?
2866 }
2867 (left, right) => left.equals_datatype(right),
2875 };
2876 if !compatible {
2877 bail!(
2878 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2879 arrow_field.name(),
2880 converted_arrow_data_type,
2881 arrow_field.data_type()
2882 );
2883 }
2884 }
2885 Ok(true)
2886}
2887
2888pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2890 if rw_schema.fields.len() != arrow_schema.fields().len() {
2891 bail!(
2892 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2893 rw_schema.fields.len(),
2894 arrow_schema.fields.len()
2895 );
2896 }
2897
2898 let mut schema_fields = HashMap::new();
2899 rw_schema.fields.iter().for_each(|field| {
2900 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2901 assert!(res.is_none())
2903 });
2904
2905 check_compatibility(schema_fields, &arrow_schema.fields)?;
2906 Ok(())
2907}
2908
2909fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2910 serde_json::to_vec(&metadata).unwrap()
2911}
2912
2913fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2914 serde_json::from_slice(&bytes).unwrap()
2915}
2916
2917pub fn parse_partition_by_exprs(
2918 expr: String,
2919) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2920 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2922 if !re.is_match(&expr) {
2923 bail!(format!(
2924 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2925 expr
2926 ))
2927 }
2928 let caps = re.captures_iter(&expr);
2929
2930 let mut partition_columns = vec![];
2931
2932 for mat in caps {
2933 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2934 (&mat["transform"], Transform::Identity)
2935 } else {
2936 let mut func = mat["transform"].to_owned();
2937 if func == "bucket" || func == "truncate" {
2938 let n = &mat
2939 .name("n")
2940 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2941 .as_str();
2942 func = format!("{func}[{n}]");
2943 }
2944 (
2945 &mat["field"],
2946 Transform::from_str(&func)
2947 .with_context(|| format!("invalid transform function {}", func))?,
2948 )
2949 };
2950 partition_columns.push((column.to_owned(), transform));
2951 }
2952 Ok(partition_columns)
2953}
2954
2955pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2956 if should_enable_iceberg_cow(sink_type, write_mode) {
2957 ICEBERG_COW_BRANCH.to_owned()
2958 } else {
2959 MAIN_BRANCH.to_owned()
2960 }
2961}
2962
2963pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2964 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2965}
2966
2967impl crate::with_options::WithOptions for IcebergWriteMode {}
2968
2969impl crate::with_options::WithOptions for FormatVersion {}
2970
2971impl crate::with_options::WithOptions for CompactionType {}
2972
2973#[cfg(test)]
2974mod test {
2975 use std::collections::BTreeMap;
2976
2977 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2978 use risingwave_common::types::{DataType, MapType, StructType};
2979
2980 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2981 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2982 use crate::sink::iceberg::{
2983 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2984 ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2985 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2986 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2987 };
2988
2989 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2992 fn test_compatible_arrow_schema() {
2993 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2994
2995 use super::*;
2996 let risingwave_schema = Schema::new(vec![
2997 Field::with_name(DataType::Int32, "a"),
2998 Field::with_name(DataType::Int32, "b"),
2999 Field::with_name(DataType::Int32, "c"),
3000 ]);
3001 let arrow_schema = ArrowSchema::new(vec![
3002 ArrowField::new("a", ArrowDataType::Int32, false),
3003 ArrowField::new("b", ArrowDataType::Int32, false),
3004 ArrowField::new("c", ArrowDataType::Int32, false),
3005 ]);
3006
3007 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3008
3009 let risingwave_schema = Schema::new(vec![
3010 Field::with_name(DataType::Int32, "d"),
3011 Field::with_name(DataType::Int32, "c"),
3012 Field::with_name(DataType::Int32, "a"),
3013 Field::with_name(DataType::Int32, "b"),
3014 ]);
3015 let arrow_schema = ArrowSchema::new(vec![
3016 ArrowField::new("a", ArrowDataType::Int32, false),
3017 ArrowField::new("b", ArrowDataType::Int32, false),
3018 ArrowField::new("d", ArrowDataType::Int32, false),
3019 ArrowField::new("c", ArrowDataType::Int32, false),
3020 ]);
3021 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3022
3023 let risingwave_schema = Schema::new(vec![
3024 Field::with_name(
3025 DataType::Struct(StructType::new(vec![
3026 ("a1", DataType::Int32),
3027 (
3028 "a2",
3029 DataType::Struct(StructType::new(vec![
3030 ("a21", DataType::Bytea),
3031 (
3032 "a22",
3033 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3034 ),
3035 ])),
3036 ),
3037 ])),
3038 "a",
3039 ),
3040 Field::with_name(
3041 DataType::list(DataType::Struct(StructType::new(vec![
3042 ("b1", DataType::Int32),
3043 ("b2", DataType::Bytea),
3044 (
3045 "b3",
3046 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3047 ),
3048 ]))),
3049 "b",
3050 ),
3051 Field::with_name(
3052 DataType::Map(MapType::from_kv(
3053 DataType::Varchar,
3054 DataType::list(DataType::Struct(StructType::new([
3055 ("c1", DataType::Int32),
3056 ("c2", DataType::Bytea),
3057 (
3058 "c3",
3059 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3060 ),
3061 ]))),
3062 )),
3063 "c",
3064 ),
3065 ]);
3066 let arrow_schema = ArrowSchema::new(vec![
3067 ArrowField::new(
3068 "a",
3069 ArrowDataType::Struct(ArrowFields::from(vec![
3070 ArrowField::new("a1", ArrowDataType::Int32, false),
3071 ArrowField::new(
3072 "a2",
3073 ArrowDataType::Struct(ArrowFields::from(vec![
3074 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3075 ArrowField::new_map(
3076 "a22",
3077 "entries",
3078 ArrowFieldRef::new(ArrowField::new(
3079 "key",
3080 ArrowDataType::Utf8,
3081 false,
3082 )),
3083 ArrowFieldRef::new(ArrowField::new(
3084 "value",
3085 ArrowDataType::Utf8,
3086 false,
3087 )),
3088 false,
3089 false,
3090 ),
3091 ])),
3092 false,
3093 ),
3094 ])),
3095 false,
3096 ),
3097 ArrowField::new(
3098 "b",
3099 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3100 ArrowDataType::Struct(ArrowFields::from(vec![
3101 ArrowField::new("b1", ArrowDataType::Int32, false),
3102 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3103 ArrowField::new_map(
3104 "b3",
3105 "entries",
3106 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3107 ArrowFieldRef::new(ArrowField::new(
3108 "value",
3109 ArrowDataType::Utf8,
3110 false,
3111 )),
3112 false,
3113 false,
3114 ),
3115 ])),
3116 false,
3117 ))),
3118 false,
3119 ),
3120 ArrowField::new_map(
3121 "c",
3122 "entries",
3123 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3124 ArrowFieldRef::new(ArrowField::new(
3125 "value",
3126 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3127 ArrowDataType::Struct(ArrowFields::from(vec![
3128 ArrowField::new("c1", ArrowDataType::Int32, false),
3129 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3130 ArrowField::new_map(
3131 "c3",
3132 "entries",
3133 ArrowFieldRef::new(ArrowField::new(
3134 "key",
3135 ArrowDataType::Utf8,
3136 false,
3137 )),
3138 ArrowFieldRef::new(ArrowField::new(
3139 "value",
3140 ArrowDataType::Utf8,
3141 false,
3142 )),
3143 false,
3144 false,
3145 ),
3146 ])),
3147 false,
3148 ))),
3149 false,
3150 )),
3151 false,
3152 false,
3153 ),
3154 ]);
3155 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3156 }
3157
3158 #[test]
3159 fn test_parse_iceberg_config() {
3160 let values = [
3161 ("connector", "iceberg"),
3162 ("type", "upsert"),
3163 ("primary_key", "v1"),
3164 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3165 ("warehouse.path", "s3://iceberg"),
3166 ("s3.endpoint", "http://127.0.0.1:9301"),
3167 ("s3.access.key", "hummockadmin"),
3168 ("s3.secret.key", "hummockadmin"),
3169 ("s3.path.style.access", "true"),
3170 ("s3.region", "us-east-1"),
3171 ("catalog.type", "jdbc"),
3172 ("catalog.name", "demo"),
3173 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3174 ("catalog.jdbc.user", "admin"),
3175 ("catalog.jdbc.password", "123456"),
3176 ("database.name", "demo_db"),
3177 ("table.name", "demo_table"),
3178 ("enable_compaction", "true"),
3179 ("compaction_interval_sec", "1800"),
3180 ("enable_snapshot_expiration", "true"),
3181 ]
3182 .into_iter()
3183 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3184 .collect();
3185
3186 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3187
3188 let expected_iceberg_config = IcebergConfig {
3189 common: IcebergCommon {
3190 warehouse_path: Some("s3://iceberg".to_owned()),
3191 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3192 s3_region: Some("us-east-1".to_owned()),
3193 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3194 s3_access_key: Some("hummockadmin".to_owned()),
3195 s3_secret_key: Some("hummockadmin".to_owned()),
3196 s3_iam_role_arn: None,
3197 gcs_credential: None,
3198 catalog_type: Some("jdbc".to_owned()),
3199 glue_id: None,
3200 glue_region: None,
3201 glue_access_key: None,
3202 glue_secret_key: None,
3203 glue_iam_role_arn: None,
3204 catalog_name: Some("demo".to_owned()),
3205 s3_path_style_access: Some(true),
3206 catalog_credential: None,
3207 catalog_oauth2_server_uri: None,
3208 catalog_scope: None,
3209 catalog_token: None,
3210 enable_config_load: None,
3211 rest_signing_name: None,
3212 rest_signing_region: None,
3213 rest_sigv4_enabled: None,
3214 hosted_catalog: None,
3215 azblob_account_name: None,
3216 azblob_account_key: None,
3217 azblob_endpoint_url: None,
3218 catalog_header: None,
3219 adlsgen2_account_name: None,
3220 adlsgen2_account_key: None,
3221 adlsgen2_endpoint: None,
3222 vended_credentials: None,
3223 catalog_security: None,
3224 gcp_auth_scopes: None,
3225 catalog_io_impl: None,
3226 },
3227 table: IcebergTableIdentifier {
3228 database_name: Some("demo_db".to_owned()),
3229 table_name: "demo_table".to_owned(),
3230 },
3231 r#type: "upsert".to_owned(),
3232 force_append_only: false,
3233 primary_key: Some(vec!["v1".to_owned()]),
3234 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3235 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3236 .into_iter()
3237 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3238 .collect(),
3239 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3240 create_table_if_not_exists: false,
3241 is_exactly_once: Some(true),
3242 commit_retry_num: 8,
3243 enable_compaction: true,
3244 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3245 enable_snapshot_expiration: true,
3246 write_mode: IcebergWriteMode::MergeOnRead,
3247 format_version: FormatVersion::V2,
3248 snapshot_expiration_max_age_millis: None,
3249 snapshot_expiration_retain_last: None,
3250 snapshot_expiration_clear_expired_files: true,
3251 snapshot_expiration_clear_expired_meta_data: true,
3252 max_snapshots_num_before_compaction: None,
3253 small_files_threshold_mb: None,
3254 delete_files_count_threshold: None,
3255 trigger_snapshot_count: None,
3256 target_file_size_mb: None,
3257 compaction_type: None,
3258 write_parquet_compression: None,
3259 write_parquet_max_row_group_rows: None,
3260 };
3261
3262 assert_eq!(iceberg_config, expected_iceberg_config);
3263
3264 assert_eq!(
3265 &iceberg_config.full_table_name().unwrap().to_string(),
3266 "demo_db.demo_table"
3267 );
3268 }
3269
3270 async fn test_create_catalog(configs: BTreeMap<String, String>) {
3271 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3272
3273 let _table = iceberg_config.load_table().await.unwrap();
3274 }
3275
3276 #[tokio::test]
3277 #[ignore]
3278 async fn test_storage_catalog() {
3279 let values = [
3280 ("connector", "iceberg"),
3281 ("type", "append-only"),
3282 ("force_append_only", "true"),
3283 ("s3.endpoint", "http://127.0.0.1:9301"),
3284 ("s3.access.key", "hummockadmin"),
3285 ("s3.secret.key", "hummockadmin"),
3286 ("s3.region", "us-east-1"),
3287 ("s3.path.style.access", "true"),
3288 ("catalog.name", "demo"),
3289 ("catalog.type", "storage"),
3290 ("warehouse.path", "s3://icebergdata/demo"),
3291 ("database.name", "s1"),
3292 ("table.name", "t1"),
3293 ]
3294 .into_iter()
3295 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3296 .collect();
3297
3298 test_create_catalog(values).await;
3299 }
3300
3301 #[tokio::test]
3302 #[ignore]
3303 async fn test_rest_catalog() {
3304 let values = [
3305 ("connector", "iceberg"),
3306 ("type", "append-only"),
3307 ("force_append_only", "true"),
3308 ("s3.endpoint", "http://127.0.0.1:9301"),
3309 ("s3.access.key", "hummockadmin"),
3310 ("s3.secret.key", "hummockadmin"),
3311 ("s3.region", "us-east-1"),
3312 ("s3.path.style.access", "true"),
3313 ("catalog.name", "demo"),
3314 ("catalog.type", "rest"),
3315 ("catalog.uri", "http://192.168.167.4:8181"),
3316 ("warehouse.path", "s3://icebergdata/demo"),
3317 ("database.name", "s1"),
3318 ("table.name", "t1"),
3319 ]
3320 .into_iter()
3321 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3322 .collect();
3323
3324 test_create_catalog(values).await;
3325 }
3326
3327 #[tokio::test]
3328 #[ignore]
3329 async fn test_jdbc_catalog() {
3330 let values = [
3331 ("connector", "iceberg"),
3332 ("type", "append-only"),
3333 ("force_append_only", "true"),
3334 ("s3.endpoint", "http://127.0.0.1:9301"),
3335 ("s3.access.key", "hummockadmin"),
3336 ("s3.secret.key", "hummockadmin"),
3337 ("s3.region", "us-east-1"),
3338 ("s3.path.style.access", "true"),
3339 ("catalog.name", "demo"),
3340 ("catalog.type", "jdbc"),
3341 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3342 ("catalog.jdbc.user", "admin"),
3343 ("catalog.jdbc.password", "123456"),
3344 ("warehouse.path", "s3://icebergdata/demo"),
3345 ("database.name", "s1"),
3346 ("table.name", "t1"),
3347 ]
3348 .into_iter()
3349 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3350 .collect();
3351
3352 test_create_catalog(values).await;
3353 }
3354
3355 #[tokio::test]
3356 #[ignore]
3357 async fn test_hive_catalog() {
3358 let values = [
3359 ("connector", "iceberg"),
3360 ("type", "append-only"),
3361 ("force_append_only", "true"),
3362 ("s3.endpoint", "http://127.0.0.1:9301"),
3363 ("s3.access.key", "hummockadmin"),
3364 ("s3.secret.key", "hummockadmin"),
3365 ("s3.region", "us-east-1"),
3366 ("s3.path.style.access", "true"),
3367 ("catalog.name", "demo"),
3368 ("catalog.type", "hive"),
3369 ("catalog.uri", "thrift://localhost:9083"),
3370 ("warehouse.path", "s3://icebergdata/demo"),
3371 ("database.name", "s1"),
3372 ("table.name", "t1"),
3373 ]
3374 .into_iter()
3375 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3376 .collect();
3377
3378 test_create_catalog(values).await;
3379 }
3380
3381 #[test]
3383 fn test_parse_google_auth_config() {
3384 let values: BTreeMap<String, String> = [
3385 ("connector", "iceberg"),
3386 ("type", "append-only"),
3387 ("force_append_only", "true"),
3388 ("catalog.name", "biglake-catalog"),
3389 ("catalog.type", "rest"),
3390 ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3391 ("warehouse.path", "bq://projects/my-gcp-project"),
3392 ("catalog.header", "x-goog-user-project=my-gcp-project"),
3393 ("catalog.security", "google"),
3394 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3395 ("database.name", "my_dataset"),
3396 ("table.name", "my_table"),
3397 ]
3398 .into_iter()
3399 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3400 .collect();
3401
3402 let config = IcebergConfig::from_btreemap(values).unwrap();
3403 assert_eq!(config.catalog_type(), "rest");
3404 assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3405 assert_eq!(
3406 config.common.gcp_auth_scopes.as_deref(),
3407 Some(
3408 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3409 )
3410 );
3411 assert_eq!(
3412 config.common.warehouse_path.as_deref(),
3413 Some("bq://projects/my-gcp-project")
3414 );
3415 assert_eq!(
3416 config.common.catalog_header.as_deref(),
3417 Some("x-goog-user-project=my-gcp-project")
3418 );
3419 }
3420
3421 #[test]
3423 fn test_parse_oauth2_security_config() {
3424 let values: BTreeMap<String, String> = [
3425 ("connector", "iceberg"),
3426 ("type", "append-only"),
3427 ("force_append_only", "true"),
3428 ("catalog.name", "oauth2-catalog"),
3429 ("catalog.type", "rest"),
3430 ("catalog.uri", "https://example.com/iceberg/rest"),
3431 ("warehouse.path", "s3://my-bucket/warehouse"),
3432 ("catalog.security", "oauth2"),
3433 ("catalog.credential", "client_id:client_secret"),
3434 ("catalog.token", "bearer-token"),
3435 (
3436 "catalog.oauth2_server_uri",
3437 "https://oauth.example.com/token",
3438 ),
3439 ("catalog.scope", "read write"),
3440 ("database.name", "test_db"),
3441 ("table.name", "test_table"),
3442 ]
3443 .into_iter()
3444 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3445 .collect();
3446
3447 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3448
3449 assert_eq!(iceberg_config.catalog_type(), "rest");
3451
3452 assert_eq!(
3454 iceberg_config.common.catalog_security.as_deref(),
3455 Some("oauth2")
3456 );
3457 assert_eq!(
3458 iceberg_config.common.catalog_credential.as_deref(),
3459 Some("client_id:client_secret")
3460 );
3461 assert_eq!(
3462 iceberg_config.common.catalog_token.as_deref(),
3463 Some("bearer-token")
3464 );
3465 assert_eq!(
3466 iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3467 Some("https://oauth.example.com/token")
3468 );
3469 assert_eq!(
3470 iceberg_config.common.catalog_scope.as_deref(),
3471 Some("read write")
3472 );
3473 }
3474
3475 #[test]
3477 fn test_parse_invalid_security_config() {
3478 let values: BTreeMap<String, String> = [
3479 ("connector", "iceberg"),
3480 ("type", "append-only"),
3481 ("force_append_only", "true"),
3482 ("catalog.name", "invalid-catalog"),
3483 ("catalog.type", "rest"),
3484 ("catalog.uri", "https://example.com/iceberg/rest"),
3485 ("warehouse.path", "s3://my-bucket/warehouse"),
3486 ("catalog.security", "invalid_security_type"),
3487 ("database.name", "test_db"),
3488 ("table.name", "test_table"),
3489 ]
3490 .into_iter()
3491 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3492 .collect();
3493
3494 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3496
3497 assert_eq!(
3499 iceberg_config.common.catalog_security.as_deref(),
3500 Some("invalid_security_type")
3501 );
3502
3503 assert_eq!(iceberg_config.catalog_type(), "rest");
3505 }
3506
3507 #[test]
3509 fn test_parse_custom_io_impl_config() {
3510 let values: BTreeMap<String, String> = [
3511 ("connector", "iceberg"),
3512 ("type", "append-only"),
3513 ("force_append_only", "true"),
3514 ("catalog.name", "gcs-catalog"),
3515 ("catalog.type", "rest"),
3516 ("catalog.uri", "https://example.com/iceberg/rest"),
3517 ("warehouse.path", "gs://my-bucket/warehouse"),
3518 ("catalog.security", "google"),
3519 ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3520 ("database.name", "test_db"),
3521 ("table.name", "test_table"),
3522 ]
3523 .into_iter()
3524 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3525 .collect();
3526
3527 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3528
3529 assert_eq!(iceberg_config.catalog_type(), "rest");
3531
3532 assert_eq!(
3534 iceberg_config.common.catalog_io_impl.as_deref(),
3535 Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3536 );
3537
3538 assert_eq!(
3540 iceberg_config.common.catalog_security.as_deref(),
3541 Some("google")
3542 );
3543 }
3544
3545 #[test]
3546 fn test_config_constants_consistency() {
3547 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3550 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3551 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3552 assert_eq!(WRITE_MODE, "write_mode");
3553 assert_eq!(
3554 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3555 "snapshot_expiration_retain_last"
3556 );
3557 assert_eq!(
3558 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3559 "snapshot_expiration_max_age_millis"
3560 );
3561 assert_eq!(
3562 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3563 "snapshot_expiration_clear_expired_files"
3564 );
3565 assert_eq!(
3566 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3567 "snapshot_expiration_clear_expired_meta_data"
3568 );
3569 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3570 }
3571
3572 #[test]
3574 fn test_parse_compaction_config() {
3575 let values: BTreeMap<String, String> = [
3577 ("connector", "iceberg"),
3578 ("type", "upsert"),
3579 ("primary_key", "id"),
3580 ("warehouse.path", "s3://iceberg"),
3581 ("s3.endpoint", "http://127.0.0.1:9301"),
3582 ("s3.access.key", "test"),
3583 ("s3.secret.key", "test"),
3584 ("s3.region", "us-east-1"),
3585 ("catalog.type", "storage"),
3586 ("catalog.name", "demo"),
3587 ("database.name", "test_db"),
3588 ("table.name", "test_table"),
3589 ("enable_compaction", "true"),
3590 ("compaction.max_snapshots_num", "100"),
3591 ("compaction.small_files_threshold_mb", "512"),
3592 ("compaction.delete_files_count_threshold", "50"),
3593 ("compaction.trigger_snapshot_count", "10"),
3594 ("compaction.target_file_size_mb", "256"),
3595 ("compaction.type", "full"),
3596 ("compaction.write_parquet_compression", "zstd"),
3597 ("compaction.write_parquet_max_row_group_rows", "50000"),
3598 ]
3599 .into_iter()
3600 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3601 .collect();
3602
3603 let config = IcebergConfig::from_btreemap(values).unwrap();
3604 assert!(config.enable_compaction);
3605 assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3606 assert_eq!(config.small_files_threshold_mb, Some(512));
3607 assert_eq!(config.delete_files_count_threshold, Some(50));
3608 assert_eq!(config.trigger_snapshot_count, Some(10));
3609 assert_eq!(config.target_file_size_mb, Some(256));
3610 assert_eq!(config.compaction_type, Some(CompactionType::Full));
3611 assert_eq!(config.target_file_size_mb(), 256);
3612 assert_eq!(config.write_parquet_compression(), "zstd");
3613 assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3614
3615 let values: BTreeMap<String, String> = [
3617 ("connector", "iceberg"),
3618 ("type", "append-only"),
3619 ("force_append_only", "true"),
3620 ("catalog.name", "test-catalog"),
3621 ("catalog.type", "storage"),
3622 ("warehouse.path", "s3://my-bucket/warehouse"),
3623 ("database.name", "test_db"),
3624 ("table.name", "test_table"),
3625 ]
3626 .into_iter()
3627 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3628 .collect();
3629
3630 let config = IcebergConfig::from_btreemap(values).unwrap();
3631 assert_eq!(config.target_file_size_mb(), 1024); assert_eq!(config.write_parquet_compression(), "zstd"); assert_eq!(config.write_parquet_max_row_group_rows(), 122880); }
3635
3636 #[test]
3638 fn test_parse_parquet_compression() {
3639 use parquet::basic::Compression;
3640
3641 use super::parse_parquet_compression;
3642
3643 assert!(matches!(
3645 parse_parquet_compression("snappy"),
3646 Compression::SNAPPY
3647 ));
3648 assert!(matches!(
3649 parse_parquet_compression("gzip"),
3650 Compression::GZIP(_)
3651 ));
3652 assert!(matches!(
3653 parse_parquet_compression("zstd"),
3654 Compression::ZSTD(_)
3655 ));
3656 assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3657 assert!(matches!(
3658 parse_parquet_compression("brotli"),
3659 Compression::BROTLI(_)
3660 ));
3661 assert!(matches!(
3662 parse_parquet_compression("uncompressed"),
3663 Compression::UNCOMPRESSED
3664 ));
3665
3666 assert!(matches!(
3668 parse_parquet_compression("SNAPPY"),
3669 Compression::SNAPPY
3670 ));
3671 assert!(matches!(
3672 parse_parquet_compression("Zstd"),
3673 Compression::ZSTD(_)
3674 ));
3675
3676 assert!(matches!(
3678 parse_parquet_compression("invalid"),
3679 Compression::SNAPPY
3680 ));
3681 }
3682
3683 #[test]
3684 fn test_append_only_rejects_copy_on_write() {
3685 let values = [
3687 ("connector", "iceberg"),
3688 ("type", "append-only"),
3689 ("warehouse.path", "s3://iceberg"),
3690 ("s3.endpoint", "http://127.0.0.1:9301"),
3691 ("s3.access.key", "test"),
3692 ("s3.secret.key", "test"),
3693 ("s3.region", "us-east-1"),
3694 ("catalog.type", "storage"),
3695 ("catalog.name", "demo"),
3696 ("database.name", "test_db"),
3697 ("table.name", "test_table"),
3698 ("write_mode", "copy-on-write"),
3699 ]
3700 .into_iter()
3701 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3702 .collect();
3703
3704 let result = IcebergConfig::from_btreemap(values);
3705 assert!(result.is_err());
3706 assert!(
3707 result
3708 .unwrap_err()
3709 .to_string()
3710 .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3711 );
3712 }
3713
3714 #[test]
3715 fn test_append_only_accepts_merge_on_read() {
3716 let values = [
3718 ("connector", "iceberg"),
3719 ("type", "append-only"),
3720 ("warehouse.path", "s3://iceberg"),
3721 ("s3.endpoint", "http://127.0.0.1:9301"),
3722 ("s3.access.key", "test"),
3723 ("s3.secret.key", "test"),
3724 ("s3.region", "us-east-1"),
3725 ("catalog.type", "storage"),
3726 ("catalog.name", "demo"),
3727 ("database.name", "test_db"),
3728 ("table.name", "test_table"),
3729 ("write_mode", "merge-on-read"),
3730 ]
3731 .into_iter()
3732 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3733 .collect();
3734
3735 let result = IcebergConfig::from_btreemap(values);
3736 assert!(result.is_ok());
3737 let config = result.unwrap();
3738 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3739 }
3740
3741 #[test]
3742 fn test_append_only_defaults_to_merge_on_read() {
3743 let values = [
3745 ("connector", "iceberg"),
3746 ("type", "append-only"),
3747 ("warehouse.path", "s3://iceberg"),
3748 ("s3.endpoint", "http://127.0.0.1:9301"),
3749 ("s3.access.key", "test"),
3750 ("s3.secret.key", "test"),
3751 ("s3.region", "us-east-1"),
3752 ("catalog.type", "storage"),
3753 ("catalog.name", "demo"),
3754 ("database.name", "test_db"),
3755 ("table.name", "test_table"),
3756 ]
3757 .into_iter()
3758 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3759 .collect();
3760
3761 let result = IcebergConfig::from_btreemap(values);
3762 assert!(result.is_ok());
3763 let config = result.unwrap();
3764 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3765 }
3766
3767 #[test]
3768 fn test_upsert_accepts_copy_on_write() {
3769 let values = [
3771 ("connector", "iceberg"),
3772 ("type", "upsert"),
3773 ("primary_key", "id"),
3774 ("warehouse.path", "s3://iceberg"),
3775 ("s3.endpoint", "http://127.0.0.1:9301"),
3776 ("s3.access.key", "test"),
3777 ("s3.secret.key", "test"),
3778 ("s3.region", "us-east-1"),
3779 ("catalog.type", "storage"),
3780 ("catalog.name", "demo"),
3781 ("database.name", "test_db"),
3782 ("table.name", "test_table"),
3783 ("write_mode", "copy-on-write"),
3784 ]
3785 .into_iter()
3786 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3787 .collect();
3788
3789 let result = IcebergConfig::from_btreemap(values);
3790 assert!(result.is_ok());
3791 let config = result.unwrap();
3792 assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3793 }
3794}