1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31 SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32 UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38 DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45 PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50 DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64 Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94 SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103 TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121 #[default]
122 MergeOnRead,
123 CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127 pub fn as_str(self) -> &'static str {
128 match self {
129 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131 }
132 }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136 type Err = SinkError;
137
138 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139 match s {
140 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142 _ => Err(SinkError::Config(anyhow!(format!(
143 "invalid write_mode: {}, must be one of: {}, {}",
144 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145 )))),
146 }
147 }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151 type Error = <Self as std::str::FromStr>::Err;
152
153 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154 value.parse()
155 }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159 type Error = <Self as std::str::FromStr>::Err;
160
161 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162 value.as_str().parse()
163 }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.write_str(self.as_str())
169 }
170}
171
172pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182 "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197 "compaction.write_parquet_max_row_group_rows";
198pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
199pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
200
201const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
202
203fn default_commit_retry_num() -> u32 {
204 8
205}
206
207fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
208 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
209}
210
211fn default_iceberg_write_mode() -> IcebergWriteMode {
212 IcebergWriteMode::MergeOnRead
213}
214
215fn default_iceberg_format_version() -> FormatVersion {
216 FormatVersion::V2
217}
218
219fn default_true() -> bool {
220 true
221}
222
223fn default_some_true() -> Option<bool> {
224 Some(true)
225}
226
227fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
228 let parsed = value
229 .trim()
230 .parse::<u8>()
231 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
232 match parsed {
233 1 => Ok(FormatVersion::V1),
234 2 => Ok(FormatVersion::V2),
235 3 => Ok(FormatVersion::V3),
236 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
237 }
238}
239
240fn deserialize_format_version<'de, D>(
241 deserializer: D,
242) -> std::result::Result<FormatVersion, D::Error>
243where
244 D: Deserializer<'de>,
245{
246 struct FormatVersionVisitor;
247
248 impl<'de> Visitor<'de> for FormatVersionVisitor {
249 type Value = FormatVersion;
250
251 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 formatter.write_str("format-version as 1, 2, or 3")
253 }
254
255 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
256 where
257 E: de::Error,
258 {
259 let value = u8::try_from(value)
260 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
261 parse_format_version_str(&value.to_string()).map_err(E::custom)
262 }
263
264 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
265 where
266 E: de::Error,
267 {
268 let value = u8::try_from(value)
269 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
270 parse_format_version_str(&value.to_string()).map_err(E::custom)
271 }
272
273 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
274 where
275 E: de::Error,
276 {
277 parse_format_version_str(value).map_err(E::custom)
278 }
279
280 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
281 where
282 E: de::Error,
283 {
284 self.visit_str(&value)
285 }
286 }
287
288 deserializer.deserialize_any(FormatVersionVisitor)
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
293#[serde(rename_all = "kebab-case")]
294pub enum CompactionType {
295 #[default]
297 Full,
298 SmallFiles,
300 FilesWithDelete,
302}
303
304impl CompactionType {
305 pub fn as_str(&self) -> &'static str {
306 match self {
307 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
308 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
309 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
310 }
311 }
312}
313
314impl std::str::FromStr for CompactionType {
315 type Err = SinkError;
316
317 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
318 match s {
319 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
320 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
321 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
322 _ => Err(SinkError::Config(anyhow!(format!(
323 "invalid compaction_type: {}, must be one of: {}, {}, {}",
324 s,
325 ICEBERG_COMPACTION_TYPE_FULL,
326 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
327 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
328 )))),
329 }
330 }
331}
332
333impl TryFrom<&str> for CompactionType {
334 type Error = <Self as std::str::FromStr>::Err;
335
336 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
337 value.parse()
338 }
339}
340
341impl TryFrom<String> for CompactionType {
342 type Error = <Self as std::str::FromStr>::Err;
343
344 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
345 value.as_str().parse()
346 }
347}
348
349impl std::fmt::Display for CompactionType {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 write!(f, "{}", self.as_str())
352 }
353}
354
355#[serde_as]
356#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
357pub struct IcebergConfig {
358 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
361 pub force_append_only: bool,
362
363 #[serde(flatten)]
364 common: IcebergCommon,
365
366 #[serde(flatten)]
367 table: IcebergTableIdentifier,
368
369 #[serde(
370 rename = "primary_key",
371 default,
372 deserialize_with = "deserialize_optional_string_seq_from_string"
373 )]
374 pub primary_key: Option<Vec<String>>,
375
376 #[serde(skip)]
378 pub java_catalog_props: HashMap<String, String>,
379
380 #[serde(default)]
381 pub partition_by: Option<String>,
382
383 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
385 #[serde_as(as = "DisplayFromStr")]
386 #[with_option(allow_alter_on_fly)]
387 pub commit_checkpoint_interval: u64,
388
389 #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
392 #[serde_as(as = "Option<DisplayFromStr>")]
393 #[with_option(allow_alter_on_fly)]
394 pub commit_checkpoint_size_threshold_mb: Option<u64>,
395
396 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
397 pub create_table_if_not_exists: bool,
398
399 #[serde(default = "default_some_true")]
401 #[serde_as(as = "Option<DisplayFromStr>")]
402 pub is_exactly_once: Option<bool>,
403 #[serde(default = "default_commit_retry_num")]
408 pub commit_retry_num: u32,
409
410 #[serde(
412 rename = "enable_compaction",
413 default,
414 deserialize_with = "deserialize_bool_from_string"
415 )]
416 #[with_option(allow_alter_on_fly)]
417 pub enable_compaction: bool,
418
419 #[serde(rename = "compaction_interval_sec", default)]
421 #[serde_as(as = "Option<DisplayFromStr>")]
422 #[with_option(allow_alter_on_fly)]
423 pub compaction_interval_sec: Option<u64>,
424
425 #[serde(
427 rename = "enable_snapshot_expiration",
428 default,
429 deserialize_with = "deserialize_bool_from_string"
430 )]
431 #[with_option(allow_alter_on_fly)]
432 pub enable_snapshot_expiration: bool,
433
434 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
436 pub write_mode: IcebergWriteMode,
437
438 #[serde(
440 rename = "format_version",
441 default = "default_iceberg_format_version",
442 deserialize_with = "deserialize_format_version"
443 )]
444 pub format_version: FormatVersion,
445
446 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
449 #[serde_as(as = "Option<DisplayFromStr>")]
450 #[with_option(allow_alter_on_fly)]
451 pub snapshot_expiration_max_age_millis: Option<i64>,
452
453 #[serde(rename = "snapshot_expiration_retain_last", default)]
455 #[serde_as(as = "Option<DisplayFromStr>")]
456 #[with_option(allow_alter_on_fly)]
457 pub snapshot_expiration_retain_last: Option<i32>,
458
459 #[serde(
460 rename = "snapshot_expiration_clear_expired_files",
461 default = "default_true",
462 deserialize_with = "deserialize_bool_from_string"
463 )]
464 #[with_option(allow_alter_on_fly)]
465 pub snapshot_expiration_clear_expired_files: bool,
466
467 #[serde(
468 rename = "snapshot_expiration_clear_expired_meta_data",
469 default = "default_true",
470 deserialize_with = "deserialize_bool_from_string"
471 )]
472 #[with_option(allow_alter_on_fly)]
473 pub snapshot_expiration_clear_expired_meta_data: bool,
474
475 #[serde(rename = "compaction.max_snapshots_num", default)]
478 #[serde_as(as = "Option<DisplayFromStr>")]
479 #[with_option(allow_alter_on_fly)]
480 pub max_snapshots_num_before_compaction: Option<usize>,
481
482 #[serde(rename = "compaction.small_files_threshold_mb", default)]
483 #[serde_as(as = "Option<DisplayFromStr>")]
484 #[with_option(allow_alter_on_fly)]
485 pub small_files_threshold_mb: Option<u64>,
486
487 #[serde(rename = "compaction.delete_files_count_threshold", default)]
488 #[serde_as(as = "Option<DisplayFromStr>")]
489 #[with_option(allow_alter_on_fly)]
490 pub delete_files_count_threshold: Option<usize>,
491
492 #[serde(rename = "compaction.trigger_snapshot_count", default)]
493 #[serde_as(as = "Option<DisplayFromStr>")]
494 #[with_option(allow_alter_on_fly)]
495 pub trigger_snapshot_count: Option<usize>,
496
497 #[serde(rename = "compaction.target_file_size_mb", default)]
498 #[serde_as(as = "Option<DisplayFromStr>")]
499 #[with_option(allow_alter_on_fly)]
500 pub target_file_size_mb: Option<u64>,
501
502 #[serde(rename = "compaction.type", default)]
505 #[with_option(allow_alter_on_fly)]
506 pub compaction_type: Option<CompactionType>,
507
508 #[serde(rename = "compaction.write_parquet_compression", default)]
512 #[with_option(allow_alter_on_fly)]
513 pub write_parquet_compression: Option<String>,
514
515 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
518 #[serde_as(as = "Option<DisplayFromStr>")]
519 #[with_option(allow_alter_on_fly)]
520 pub write_parquet_max_row_group_rows: Option<usize>,
521}
522
523impl EnforceSecret for IcebergConfig {
524 fn enforce_secret<'a>(
525 prop_iter: impl Iterator<Item = &'a str>,
526 ) -> crate::error::ConnectorResult<()> {
527 for prop in prop_iter {
528 IcebergCommon::enforce_one(prop)?;
529 }
530 Ok(())
531 }
532
533 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
534 IcebergCommon::enforce_one(prop)
535 }
536}
537
538impl IcebergConfig {
539 fn validate_append_only_write_mode(
542 sink_type: &str,
543 write_mode: IcebergWriteMode,
544 ) -> Result<()> {
545 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
546 return Err(SinkError::Config(anyhow!(
547 "'copy-on-write' mode is not supported for append-only iceberg sink. \
548 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
549 )));
550 }
551 Ok(())
552 }
553
554 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
555 let mut config =
556 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
557 .map_err(|e| SinkError::Config(anyhow!(e)))?;
558
559 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
560 return Err(SinkError::Config(anyhow!(
561 "`{}` must be {}, or {}",
562 SINK_TYPE_OPTION,
563 SINK_TYPE_APPEND_ONLY,
564 SINK_TYPE_UPSERT
565 )));
566 }
567
568 if config.r#type == SINK_TYPE_UPSERT {
569 if let Some(primary_key) = &config.primary_key {
570 if primary_key.is_empty() {
571 return Err(SinkError::Config(anyhow!(
572 "`primary-key` must not be empty in {}",
573 SINK_TYPE_UPSERT
574 )));
575 }
576 } else {
577 return Err(SinkError::Config(anyhow!(
578 "Must set `primary-key` in {}",
579 SINK_TYPE_UPSERT
580 )));
581 }
582 }
583
584 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
586
587 config.java_catalog_props = values
589 .iter()
590 .filter(|(k, _v)| {
591 k.starts_with("catalog.")
592 && k != &"catalog.uri"
593 && k != &"catalog.type"
594 && k != &"catalog.name"
595 && k != &"catalog.header"
596 })
597 .map(|(k, v)| (k[8..].to_string(), v.clone()))
598 .collect();
599
600 if config.commit_checkpoint_interval == 0 {
601 return Err(SinkError::Config(anyhow!(
602 "`commit-checkpoint-interval` must be greater than 0"
603 )));
604 }
605
606 if config.commit_checkpoint_size_threshold_mb == Some(0) {
607 return Err(SinkError::Config(anyhow!(
608 "`commit_checkpoint_size_threshold_mb` must be greater than 0"
609 )));
610 }
611
612 Ok(config)
613 }
614
615 pub fn catalog_type(&self) -> &str {
616 self.common.catalog_type()
617 }
618
619 pub async fn load_table(&self) -> Result<Table> {
620 self.common
621 .load_table(&self.table, &self.java_catalog_props)
622 .await
623 .map_err(Into::into)
624 }
625
626 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
627 self.common
628 .create_catalog(&self.java_catalog_props)
629 .await
630 .map_err(Into::into)
631 }
632
633 pub fn full_table_name(&self) -> Result<TableIdent> {
634 self.table.to_table_ident().map_err(Into::into)
635 }
636
637 pub fn catalog_name(&self) -> String {
638 self.common.catalog_name()
639 }
640
641 pub fn table_format_version(&self) -> FormatVersion {
642 self.format_version
643 }
644
645 pub fn compaction_interval_sec(&self) -> u64 {
646 self.compaction_interval_sec.unwrap_or(3600)
648 }
649
650 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
653 self.snapshot_expiration_max_age_millis
654 .map(|max_age_millis| current_time_ms - max_age_millis)
655 }
656
657 pub fn trigger_snapshot_count(&self) -> usize {
658 self.trigger_snapshot_count.unwrap_or(usize::MAX)
659 }
660
661 pub fn small_files_threshold_mb(&self) -> u64 {
662 self.small_files_threshold_mb.unwrap_or(64)
663 }
664
665 pub fn delete_files_count_threshold(&self) -> usize {
666 self.delete_files_count_threshold.unwrap_or(256)
667 }
668
669 pub fn target_file_size_mb(&self) -> u64 {
670 self.target_file_size_mb.unwrap_or(1024)
671 }
672
673 pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
674 self.commit_checkpoint_size_threshold_mb
675 .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
676 }
677
678 pub fn compaction_type(&self) -> CompactionType {
681 self.compaction_type.unwrap_or_default()
682 }
683
684 pub fn write_parquet_compression(&self) -> &str {
687 self.write_parquet_compression.as_deref().unwrap_or("zstd")
688 }
689
690 pub fn write_parquet_max_row_group_rows(&self) -> usize {
693 self.write_parquet_max_row_group_rows.unwrap_or(122880)
694 }
695
696 pub fn get_parquet_compression(&self) -> Compression {
699 parse_parquet_compression(self.write_parquet_compression())
700 }
701}
702
703fn parse_parquet_compression(codec: &str) -> Compression {
705 match codec.to_lowercase().as_str() {
706 "uncompressed" => Compression::UNCOMPRESSED,
707 "snappy" => Compression::SNAPPY,
708 "gzip" => Compression::GZIP(Default::default()),
709 "lzo" => Compression::LZO,
710 "brotli" => Compression::BROTLI(Default::default()),
711 "lz4" => Compression::LZ4,
712 "zstd" => Compression::ZSTD(Default::default()),
713 _ => {
714 tracing::warn!(
715 "Unknown compression codec '{}', falling back to SNAPPY",
716 codec
717 );
718 Compression::SNAPPY
719 }
720 }
721}
722
723pub struct IcebergSink {
724 pub config: IcebergConfig,
725 param: SinkParam,
726 unique_column_ids: Option<Vec<usize>>,
728}
729
730impl EnforceSecret for IcebergSink {
731 fn enforce_secret<'a>(
732 prop_iter: impl Iterator<Item = &'a str>,
733 ) -> crate::error::ConnectorResult<()> {
734 for prop in prop_iter {
735 IcebergConfig::enforce_one(prop)?;
736 }
737 Ok(())
738 }
739}
740
741impl TryFrom<SinkParam> for IcebergSink {
742 type Error = SinkError;
743
744 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
745 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
746 IcebergSink::new(config, param)
747 }
748}
749
750impl Debug for IcebergSink {
751 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
752 f.debug_struct("IcebergSink")
753 .field("config", &self.config)
754 .finish()
755 }
756}
757
758async fn create_and_validate_table_impl(
759 config: &IcebergConfig,
760 param: &SinkParam,
761) -> Result<Table> {
762 if config.create_table_if_not_exists {
763 create_table_if_not_exists_impl(config, param).await?;
764 }
765
766 let table = config
767 .load_table()
768 .await
769 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
770
771 let sink_schema = param.schema();
772 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
773 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
774
775 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
776 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
777
778 Ok(table)
779}
780
781async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
782 let catalog = config.create_catalog().await?;
783 let namespace = if let Some(database_name) = config.table.database_name() {
784 let namespace = NamespaceIdent::new(database_name.to_owned());
785 if !catalog
786 .namespace_exists(&namespace)
787 .await
788 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
789 {
790 catalog
791 .create_namespace(&namespace, HashMap::default())
792 .await
793 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794 .context("failed to create iceberg namespace")?;
795 }
796 namespace
797 } else {
798 bail!("database name must be set if you want to create table")
799 };
800
801 let table_id = config
802 .full_table_name()
803 .context("Unable to parse table name")?;
804 if !catalog
805 .table_exists(&table_id)
806 .await
807 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
808 {
809 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
810 let arrow_fields = param
812 .columns
813 .iter()
814 .map(|column| {
815 Ok(iceberg_create_table_arrow_convert
816 .to_arrow_field(&column.name, &column.data_type)
817 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
818 .context(format!(
819 "failed to convert {}: {} to arrow type",
820 &column.name, &column.data_type
821 ))?)
822 })
823 .collect::<Result<Vec<ArrowField>>>()?;
824 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
825 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
826 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
827 .context("failed to convert arrow schema to iceberg schema")?;
828
829 let location = {
830 let mut names = namespace.clone().inner();
831 names.push(config.table.table_name().to_owned());
832 match &config.common.warehouse_path {
833 Some(warehouse_path) => {
834 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
835 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
837 let url = Url::parse(warehouse_path);
838 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
839 if config.common.catalog_type() == "rest"
842 || config.common.catalog_type() == "rest_rust"
843 {
844 None
845 } else {
846 bail!(format!("Invalid warehouse path: {}", warehouse_path))
847 }
848 } else if warehouse_path.ends_with('/') {
849 Some(format!("{}{}", warehouse_path, names.join("/")))
850 } else {
851 Some(format!("{}/{}", warehouse_path, names.join("/")))
852 }
853 }
854 None => None,
855 }
856 };
857
858 let partition_spec = match &config.partition_by {
859 Some(partition_by) => {
860 let mut partition_fields = Vec::<UnboundPartitionField>::new();
861 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
862 .into_iter()
863 .enumerate()
864 {
865 match iceberg_schema.field_id_by_name(&column) {
866 Some(id) => partition_fields.push(
867 UnboundPartitionField::builder()
868 .source_id(id)
869 .transform(transform)
870 .name(format!("_p_{}", column))
871 .field_id(PARTITION_DATA_ID_START + i as i32)
872 .build(),
873 ),
874 None => bail!(format!(
875 "Partition source column does not exist in schema: {}",
876 column
877 )),
878 };
879 }
880 Some(
881 UnboundPartitionSpec::builder()
882 .with_spec_id(0)
883 .add_partition_fields(partition_fields)
884 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
885 .context("failed to add partition columns")?
886 .build(),
887 )
888 }
889 None => None,
890 };
891
892 let properties = HashMap::from([(
894 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
895 (config.format_version as u8).to_string(),
896 )]);
897
898 let table_creation_builder = TableCreation::builder()
899 .name(config.table.table_name().to_owned())
900 .schema(iceberg_schema)
901 .format_version(config.table_format_version())
902 .properties(properties);
903
904 let table_creation = match (location, partition_spec) {
905 (Some(location), Some(partition_spec)) => table_creation_builder
906 .location(location)
907 .partition_spec(partition_spec)
908 .build(),
909 (Some(location), None) => table_creation_builder.location(location).build(),
910 (None, Some(partition_spec)) => table_creation_builder
911 .partition_spec(partition_spec)
912 .build(),
913 (None, None) => table_creation_builder.build(),
914 };
915
916 catalog
917 .create_table(&namespace, table_creation)
918 .await
919 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
920 .context("failed to create iceberg table")?;
921 }
922 Ok(())
923}
924
925impl IcebergSink {
926 pub async fn create_and_validate_table(&self) -> Result<Table> {
927 create_and_validate_table_impl(&self.config, &self.param).await
928 }
929
930 pub async fn create_table_if_not_exists(&self) -> Result<()> {
931 create_table_if_not_exists_impl(&self.config, &self.param).await
932 }
933
934 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
935 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
936 if let Some(pk) = &config.primary_key {
937 let mut unique_column_ids = Vec::with_capacity(pk.len());
938 for col_name in pk {
939 let id = param
940 .columns
941 .iter()
942 .find(|col| col.name.as_str() == col_name)
943 .ok_or_else(|| {
944 SinkError::Config(anyhow!(
945 "Primary key column {} not found in sink schema",
946 col_name
947 ))
948 })?
949 .column_id
950 .get_id() as usize;
951 unique_column_ids.push(id);
952 }
953 Some(unique_column_ids)
954 } else {
955 unreachable!()
956 }
957 } else {
958 None
959 };
960 Ok(Self {
961 config,
962 param,
963 unique_column_ids,
964 })
965 }
966}
967
968impl Sink for IcebergSink {
969 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
970
971 const SINK_NAME: &'static str = ICEBERG_SINK;
972
973 async fn validate(&self) -> Result<()> {
974 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
975 bail!("Snowflake catalog only supports iceberg sources");
976 }
977
978 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
979 risingwave_common::license::Feature::IcebergSinkWithGlue
980 .check_available()
981 .map_err(|e| anyhow::anyhow!(e))?;
982 }
983
984 IcebergConfig::validate_append_only_write_mode(
986 &self.config.r#type,
987 self.config.write_mode,
988 )?;
989
990 let compaction_type = self.config.compaction_type();
992
993 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
996 && compaction_type != CompactionType::Full
997 {
998 bail!(
999 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
1000 compaction_type
1001 );
1002 }
1003
1004 match compaction_type {
1005 CompactionType::SmallFiles => {
1006 risingwave_common::license::Feature::IcebergCompaction
1008 .check_available()
1009 .map_err(|e| anyhow::anyhow!(e))?;
1010
1011 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1013 bail!(
1014 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1015 self.config.write_mode
1016 );
1017 }
1018
1019 if self.config.delete_files_count_threshold.is_some() {
1021 bail!(
1022 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
1023 );
1024 }
1025 }
1026 CompactionType::FilesWithDelete => {
1027 risingwave_common::license::Feature::IcebergCompaction
1029 .check_available()
1030 .map_err(|e| anyhow::anyhow!(e))?;
1031
1032 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1034 bail!(
1035 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1036 self.config.write_mode
1037 );
1038 }
1039
1040 if self.config.small_files_threshold_mb.is_some() {
1042 bail!(
1043 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1044 );
1045 }
1046 }
1047 CompactionType::Full => {
1048 }
1050 }
1051
1052 let _ = self.create_and_validate_table().await?;
1053 Ok(())
1054 }
1055
1056 fn support_schema_change() -> bool {
1057 true
1058 }
1059
1060 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1061 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1062
1063 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1065 if iceberg_config.enable_compaction && compaction_interval == 0 {
1066 bail!(
1067 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1068 );
1069 }
1070
1071 tracing::info!(
1073 "Alter config compaction_interval set to {} seconds",
1074 compaction_interval
1075 );
1076 }
1077
1078 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1080 && max_snapshots < 1
1081 {
1082 bail!(
1083 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1084 max_snapshots
1085 );
1086 }
1087
1088 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1090 && target_file_size_mb == 0
1091 {
1092 bail!("`compaction.target_file_size_mb` must be greater than 0");
1093 }
1094
1095 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1097 && max_row_group_rows == 0
1098 {
1099 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1100 }
1101
1102 if let Some(ref compression) = iceberg_config.write_parquet_compression {
1104 let valid_codecs = [
1105 "uncompressed",
1106 "snappy",
1107 "gzip",
1108 "lzo",
1109 "brotli",
1110 "lz4",
1111 "zstd",
1112 ];
1113 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1114 bail!(
1115 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1116 valid_codecs,
1117 compression
1118 );
1119 }
1120 }
1121
1122 Ok(())
1123 }
1124
1125 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1126 let writer = IcebergSinkWriter::new(
1127 self.config.clone(),
1128 self.param.clone(),
1129 writer_param.clone(),
1130 self.unique_column_ids.clone(),
1131 );
1132
1133 let commit_checkpoint_interval =
1134 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1135 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1136 );
1137 let log_sinker = CoordinatedLogSinker::new(
1138 &writer_param,
1139 self.param.clone(),
1140 writer,
1141 commit_checkpoint_interval,
1142 )
1143 .await?;
1144
1145 Ok(log_sinker)
1146 }
1147
1148 fn is_coordinated_sink(&self) -> bool {
1149 true
1150 }
1151
1152 async fn new_coordinator(
1153 &self,
1154 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1155 ) -> Result<SinkCommitCoordinator> {
1156 let catalog = self.config.create_catalog().await?;
1157 let table = self.create_and_validate_table().await?;
1158 let coordinator = IcebergSinkCommitter {
1159 catalog,
1160 table,
1161 last_commit_epoch: 0,
1162 sink_id: self.param.sink_id,
1163 config: self.config.clone(),
1164 param: self.param.clone(),
1165 commit_retry_num: self.config.commit_retry_num,
1166 iceberg_compact_stat_sender,
1167 };
1168 if self.config.is_exactly_once.unwrap_or_default() {
1169 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1170 } else {
1171 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1172 }
1173 }
1174}
1175
1176enum ProjectIdxVec {
1183 None,
1184 Prepare(usize),
1185 Done(Vec<usize>),
1186}
1187
1188type DataFileWriterBuilderType =
1189 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1190type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1191 ParquetWriterBuilder,
1192 DefaultLocationGenerator,
1193 DefaultFileNameGenerator,
1194>;
1195type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1196 ParquetWriterBuilder,
1197 DefaultLocationGenerator,
1198 DefaultFileNameGenerator,
1199>;
1200type DeletionVectorWriterBuilderType =
1201 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1202type DeletionVectorWriterType =
1203 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1204type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1205 ParquetWriterBuilder,
1206 DefaultLocationGenerator,
1207 DefaultFileNameGenerator,
1208>;
1209
1210#[derive(Clone)]
1211enum PositionDeleteWriterBuilderType {
1212 PositionDelete(PositionDeleteFileWriterBuilderType),
1213 DeletionVector(DeletionVectorWriterBuilderType),
1214}
1215
1216enum PositionDeleteWriterType {
1217 PositionDelete(PositionDeleteFileWriterType),
1218 DeletionVector(DeletionVectorWriterType),
1219}
1220
1221#[async_trait]
1222impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1223 type R = PositionDeleteWriterType;
1224
1225 async fn build(
1226 &self,
1227 partition_key: Option<iceberg::spec::PartitionKey>,
1228 ) -> iceberg::Result<Self::R> {
1229 match self {
1230 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1231 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1232 ),
1233 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1234 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1235 ),
1236 }
1237 }
1238}
1239
1240#[async_trait]
1241impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1242 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1243 match self {
1244 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1245 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1246 }
1247 }
1248
1249 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1250 match self {
1251 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1252 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1253 }
1254 }
1255}
1256
1257#[derive(Clone)]
1258struct SharedIcebergWriterBuilder<B>(Arc<B>);
1259
1260#[async_trait]
1261impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1262 type R = B::R;
1263
1264 async fn build(
1265 &self,
1266 partition_key: Option<iceberg::spec::PartitionKey>,
1267 ) -> iceberg::Result<Self::R> {
1268 self.0.build(partition_key).await
1269 }
1270}
1271
1272#[derive(Clone)]
1273struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1274 inner: Arc<B>,
1275 fanout_enabled: bool,
1276 schema: IcebergSchemaRef,
1277 partition_spec: PartitionSpecRef,
1278 compute_partition: bool,
1279}
1280
1281impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1282 fn new(
1283 inner: B,
1284 fanout_enabled: bool,
1285 schema: IcebergSchemaRef,
1286 partition_spec: PartitionSpecRef,
1287 compute_partition: bool,
1288 ) -> Self {
1289 Self {
1290 inner: Arc::new(inner),
1291 fanout_enabled,
1292 schema,
1293 partition_spec,
1294 compute_partition,
1295 }
1296 }
1297
1298 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1299 let partition_splitter = match (
1300 self.partition_spec.is_unpartitioned(),
1301 self.compute_partition,
1302 ) {
1303 (true, _) => None,
1304 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1305 self.schema.clone(),
1306 self.partition_spec.clone(),
1307 )?),
1308 (false, false) => Some(
1309 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1310 self.schema.clone(),
1311 self.partition_spec.clone(),
1312 )?,
1313 ),
1314 };
1315
1316 Ok(TaskWriter::new_with_partition_splitter(
1317 SharedIcebergWriterBuilder(self.inner.clone()),
1318 self.fanout_enabled,
1319 self.schema.clone(),
1320 self.partition_spec.clone(),
1321 partition_splitter,
1322 ))
1323 }
1324}
1325
1326pub enum IcebergSinkWriter {
1327 Created(IcebergSinkWriterArgs),
1328 Initialized(IcebergSinkWriterInner),
1329}
1330
1331pub struct IcebergSinkWriterArgs {
1332 config: IcebergConfig,
1333 sink_param: SinkParam,
1334 writer_param: SinkWriterParam,
1335 unique_column_ids: Option<Vec<usize>>,
1336}
1337
1338pub struct IcebergSinkWriterInner {
1339 writer: IcebergWriterDispatch,
1340 arrow_schema: SchemaRef,
1341 metrics: IcebergWriterMetrics,
1343 table: Table,
1345 project_idx_vec: ProjectIdxVec,
1348 commit_checkpoint_size_threshold_bytes: Option<u64>,
1349 uncommitted_write_bytes: u64,
1350}
1351
1352#[allow(clippy::type_complexity)]
1353enum IcebergWriterDispatch {
1354 Append {
1355 writer: Option<Box<dyn IcebergWriter>>,
1356 writer_builder:
1357 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1358 },
1359 Upsert {
1360 writer: Option<Box<dyn IcebergWriter>>,
1361 writer_builder: TaskWriterBuilderWrapper<
1362 MonitoredGeneralWriterBuilder<
1363 DeltaWriterBuilder<
1364 DataFileWriterBuilderType,
1365 PositionDeleteWriterBuilderType,
1366 EqualityDeleteFileWriterBuilderType,
1367 >,
1368 >,
1369 >,
1370 arrow_schema_with_op_column: SchemaRef,
1371 },
1372}
1373
1374impl IcebergWriterDispatch {
1375 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1376 match self {
1377 IcebergWriterDispatch::Append { writer, .. }
1378 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1379 }
1380 }
1381}
1382
1383pub struct IcebergWriterMetrics {
1384 _write_qps: LabelGuardedIntCounter,
1389 _write_latency: LabelGuardedHistogram,
1390 write_bytes: LabelGuardedIntCounter,
1391}
1392
1393impl IcebergSinkWriter {
1394 pub fn new(
1395 config: IcebergConfig,
1396 sink_param: SinkParam,
1397 writer_param: SinkWriterParam,
1398 unique_column_ids: Option<Vec<usize>>,
1399 ) -> Self {
1400 Self::Created(IcebergSinkWriterArgs {
1401 config,
1402 sink_param,
1403 writer_param,
1404 unique_column_ids,
1405 })
1406 }
1407}
1408
1409impl IcebergSinkWriterInner {
1410 fn should_commit_on_checkpoint(&self) -> bool {
1411 self.commit_checkpoint_size_threshold_bytes
1412 .is_some_and(|threshold| {
1413 self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
1414 })
1415 }
1416
1417 fn build_append_only(
1418 config: &IcebergConfig,
1419 table: Table,
1420 writer_param: &SinkWriterParam,
1421 ) -> Result<Self> {
1422 let SinkWriterParam {
1423 extra_partition_col_idx,
1424 actor_id,
1425 sink_id,
1426 sink_name,
1427 ..
1428 } = writer_param;
1429 let metrics_labels = [
1430 &actor_id.to_string(),
1431 &sink_id.to_string(),
1432 sink_name.as_str(),
1433 ];
1434
1435 let write_qps = GLOBAL_SINK_METRICS
1437 .iceberg_write_qps
1438 .with_guarded_label_values(&metrics_labels);
1439 let write_latency = GLOBAL_SINK_METRICS
1440 .iceberg_write_latency
1441 .with_guarded_label_values(&metrics_labels);
1442 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1445 .iceberg_rolling_unflushed_data_file
1446 .with_guarded_label_values(&metrics_labels);
1447 let write_bytes = GLOBAL_SINK_METRICS
1448 .iceberg_write_bytes
1449 .with_guarded_label_values(&metrics_labels);
1450
1451 let schema = table.metadata().current_schema();
1452 let partition_spec = table.metadata().default_partition_spec();
1453 let fanout_enabled = !partition_spec.fields().is_empty();
1454 let unique_uuid_suffix = Uuid::now_v7();
1456
1457 let parquet_writer_properties = WriterProperties::builder()
1458 .set_compression(config.get_parquet_compression())
1459 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1460 .set_created_by(PARQUET_CREATED_BY.to_owned())
1461 .build();
1462
1463 let parquet_writer_builder =
1464 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1465 let rolling_builder = RollingFileWriterBuilder::new(
1466 parquet_writer_builder,
1467 (config.target_file_size_mb() * 1024 * 1024) as usize,
1468 table.file_io().clone(),
1469 DefaultLocationGenerator::new(table.metadata().clone())
1470 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1471 DefaultFileNameGenerator::new(
1472 writer_param.actor_id.to_string(),
1473 Some(unique_uuid_suffix.to_string()),
1474 iceberg::spec::DataFileFormat::Parquet,
1475 ),
1476 );
1477 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1478 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1479 data_file_builder,
1480 write_qps.clone(),
1481 write_latency.clone(),
1482 );
1483 let writer_builder = TaskWriterBuilderWrapper::new(
1484 monitored_builder,
1485 fanout_enabled,
1486 schema.clone(),
1487 partition_spec.clone(),
1488 true,
1489 );
1490 let inner_writer = Some(Box::new(
1491 writer_builder
1492 .build()
1493 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1494 ) as Box<dyn IcebergWriter>);
1495 Ok(Self {
1496 arrow_schema: Arc::new(
1497 schema_to_arrow_schema(table.metadata().current_schema())
1498 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1499 ),
1500 metrics: IcebergWriterMetrics {
1501 _write_qps: write_qps,
1502 _write_latency: write_latency,
1503 write_bytes,
1504 },
1505 writer: IcebergWriterDispatch::Append {
1506 writer: inner_writer,
1507 writer_builder,
1508 },
1509 table,
1510 project_idx_vec: {
1511 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1512 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1513 } else {
1514 ProjectIdxVec::None
1515 }
1516 },
1517 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1518 uncommitted_write_bytes: 0,
1519 })
1520 }
1521
1522 fn build_upsert(
1523 config: &IcebergConfig,
1524 table: Table,
1525 unique_column_ids: Vec<usize>,
1526 writer_param: &SinkWriterParam,
1527 ) -> Result<Self> {
1528 let SinkWriterParam {
1529 extra_partition_col_idx,
1530 actor_id,
1531 sink_id,
1532 sink_name,
1533 ..
1534 } = writer_param;
1535 let metrics_labels = [
1536 &actor_id.to_string(),
1537 &sink_id.to_string(),
1538 sink_name.as_str(),
1539 ];
1540 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1541
1542 let write_qps = GLOBAL_SINK_METRICS
1544 .iceberg_write_qps
1545 .with_guarded_label_values(&metrics_labels);
1546 let write_latency = GLOBAL_SINK_METRICS
1547 .iceberg_write_latency
1548 .with_guarded_label_values(&metrics_labels);
1549 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1552 .iceberg_rolling_unflushed_data_file
1553 .with_guarded_label_values(&metrics_labels);
1554 let write_bytes = GLOBAL_SINK_METRICS
1555 .iceberg_write_bytes
1556 .with_guarded_label_values(&metrics_labels);
1557
1558 let schema = table.metadata().current_schema();
1560 let partition_spec = table.metadata().default_partition_spec();
1561 let fanout_enabled = !partition_spec.fields().is_empty();
1562 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1563
1564 let unique_uuid_suffix = Uuid::now_v7();
1566
1567 let parquet_writer_properties = WriterProperties::builder()
1568 .set_compression(config.get_parquet_compression())
1569 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1570 .set_created_by(PARQUET_CREATED_BY.to_owned())
1571 .build();
1572
1573 let data_file_builder = {
1574 let parquet_writer_builder =
1575 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1576 let rolling_writer_builder = RollingFileWriterBuilder::new(
1577 parquet_writer_builder,
1578 (config.target_file_size_mb() * 1024 * 1024) as usize,
1579 table.file_io().clone(),
1580 DefaultLocationGenerator::new(table.metadata().clone())
1581 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1582 DefaultFileNameGenerator::new(
1583 writer_param.actor_id.to_string(),
1584 Some(unique_uuid_suffix.to_string()),
1585 iceberg::spec::DataFileFormat::Parquet,
1586 ),
1587 );
1588 DataFileWriterBuilder::new(rolling_writer_builder)
1589 };
1590 let position_delete_builder = if use_deletion_vectors {
1591 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1592 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1593 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1594 table.file_io().clone(),
1595 location_generator,
1596 DefaultFileNameGenerator::new(
1597 writer_param.actor_id.to_string(),
1598 Some(format!("delvec-{}", unique_uuid_suffix)),
1599 iceberg::spec::DataFileFormat::Puffin,
1600 ),
1601 ))
1602 } else {
1603 let parquet_writer_builder = ParquetWriterBuilder::new(
1604 parquet_writer_properties.clone(),
1605 POSITION_DELETE_SCHEMA.clone().into(),
1606 );
1607 let rolling_writer_builder = RollingFileWriterBuilder::new(
1608 parquet_writer_builder,
1609 (config.target_file_size_mb() * 1024 * 1024) as usize,
1610 table.file_io().clone(),
1611 DefaultLocationGenerator::new(table.metadata().clone())
1612 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1613 DefaultFileNameGenerator::new(
1614 writer_param.actor_id.to_string(),
1615 Some(format!("pos-del-{}", unique_uuid_suffix)),
1616 iceberg::spec::DataFileFormat::Parquet,
1617 ),
1618 );
1619 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1620 rolling_writer_builder,
1621 ))
1622 };
1623 let equality_delete_builder = {
1624 let eq_del_config = EqualityDeleteWriterConfig::new(
1625 unique_column_ids.clone(),
1626 table.metadata().current_schema().clone(),
1627 )
1628 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1629 let parquet_writer_builder = ParquetWriterBuilder::new(
1630 parquet_writer_properties,
1631 Arc::new(
1632 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1633 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1634 ),
1635 );
1636 let rolling_writer_builder = RollingFileWriterBuilder::new(
1637 parquet_writer_builder,
1638 (config.target_file_size_mb() * 1024 * 1024) as usize,
1639 table.file_io().clone(),
1640 DefaultLocationGenerator::new(table.metadata().clone())
1641 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1642 DefaultFileNameGenerator::new(
1643 writer_param.actor_id.to_string(),
1644 Some(format!("eq-del-{}", unique_uuid_suffix)),
1645 iceberg::spec::DataFileFormat::Parquet,
1646 ),
1647 );
1648
1649 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1650 };
1651 let delta_builder = DeltaWriterBuilder::new(
1652 data_file_builder,
1653 position_delete_builder,
1654 equality_delete_builder,
1655 unique_column_ids,
1656 schema.clone(),
1657 );
1658 let original_arrow_schema = Arc::new(
1659 schema_to_arrow_schema(table.metadata().current_schema())
1660 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1661 );
1662 let schema_with_extra_op_column = {
1663 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1664 new_fields.push(Arc::new(ArrowField::new(
1665 "op".to_owned(),
1666 ArrowDataType::Int32,
1667 false,
1668 )));
1669 Arc::new(ArrowSchema::new(new_fields))
1670 };
1671 let writer_builder = TaskWriterBuilderWrapper::new(
1672 MonitoredGeneralWriterBuilder::new(
1673 delta_builder,
1674 write_qps.clone(),
1675 write_latency.clone(),
1676 ),
1677 fanout_enabled,
1678 schema.clone(),
1679 partition_spec.clone(),
1680 true,
1681 );
1682 let inner_writer = Some(Box::new(
1683 writer_builder
1684 .build()
1685 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1686 ) as Box<dyn IcebergWriter>);
1687 Ok(Self {
1688 arrow_schema: original_arrow_schema,
1689 metrics: IcebergWriterMetrics {
1690 _write_qps: write_qps,
1691 _write_latency: write_latency,
1692 write_bytes,
1693 },
1694 table,
1695 writer: IcebergWriterDispatch::Upsert {
1696 writer: inner_writer,
1697 writer_builder,
1698 arrow_schema_with_op_column: schema_with_extra_op_column,
1699 },
1700 project_idx_vec: {
1701 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1702 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1703 } else {
1704 ProjectIdxVec::None
1705 }
1706 },
1707 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1708 uncommitted_write_bytes: 0,
1709 })
1710 }
1711}
1712
1713#[async_trait]
1714impl SinkWriter for IcebergSinkWriter {
1715 type CommitMetadata = Option<SinkMetadata>;
1716
1717 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1719 let Self::Created(args) = self else {
1720 return Ok(());
1721 };
1722
1723 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1724 let inner = match &args.unique_column_ids {
1725 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1726 &args.config,
1727 table,
1728 unique_column_ids.clone(),
1729 &args.writer_param,
1730 )?,
1731 None => {
1732 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1733 }
1734 };
1735
1736 *self = IcebergSinkWriter::Initialized(inner);
1737 Ok(())
1738 }
1739
1740 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1742 let Self::Initialized(inner) = self else {
1743 unreachable!("IcebergSinkWriter should be initialized before barrier");
1744 };
1745
1746 match &mut inner.writer {
1748 IcebergWriterDispatch::Append {
1749 writer,
1750 writer_builder,
1751 } => {
1752 if writer.is_none() {
1753 *writer = Some(Box::new(
1754 writer_builder
1755 .build()
1756 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1757 ));
1758 }
1759 }
1760 IcebergWriterDispatch::Upsert {
1761 writer,
1762 writer_builder,
1763 ..
1764 } => {
1765 if writer.is_none() {
1766 *writer = Some(Box::new(
1767 writer_builder
1768 .build()
1769 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1770 ));
1771 }
1772 }
1773 };
1774
1775 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1777 match &mut inner.project_idx_vec {
1778 ProjectIdxVec::None => {}
1779 ProjectIdxVec::Prepare(idx) => {
1780 if *idx >= chunk.columns().len() {
1781 return Err(SinkError::Iceberg(anyhow!(
1782 "invalid extra partition column index {}",
1783 idx
1784 )));
1785 }
1786 let project_idx_vec = (0..*idx)
1787 .chain(*idx + 1..chunk.columns().len())
1788 .collect_vec();
1789 chunk = chunk.project(&project_idx_vec);
1790 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1791 }
1792 ProjectIdxVec::Done(idx_vec) => {
1793 chunk = chunk.project(idx_vec);
1794 }
1795 }
1796 if ops.is_empty() {
1797 return Ok(());
1798 }
1799 let write_batch_size = chunk.estimated_heap_size();
1800 let batch = match &inner.writer {
1801 IcebergWriterDispatch::Append { .. } => {
1802 let filters =
1804 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1805 chunk.set_visibility(filters);
1806 IcebergArrowConvert
1807 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1808 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1809 }
1810 IcebergWriterDispatch::Upsert {
1811 arrow_schema_with_op_column,
1812 ..
1813 } => {
1814 let chunk = IcebergArrowConvert
1815 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1816 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1817 let ops = Arc::new(Int32Array::from(
1818 ops.iter()
1819 .map(|op| match op {
1820 Op::UpdateInsert | Op::Insert => INSERT_OP,
1821 Op::UpdateDelete | Op::Delete => DELETE_OP,
1822 })
1823 .collect_vec(),
1824 ));
1825 let mut columns = chunk.columns().to_vec();
1826 columns.push(ops);
1827 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1828 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1829 }
1830 };
1831
1832 let writer = inner.writer.get_writer().unwrap();
1833 writer
1834 .write(batch)
1835 .instrument_await("iceberg_write")
1836 .await
1837 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1838 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1839 inner.uncommitted_write_bytes = inner
1840 .uncommitted_write_bytes
1841 .saturating_add(write_batch_size as u64);
1842 Ok(())
1843 }
1844
1845 fn should_commit_on_checkpoint(&self) -> bool {
1846 match self {
1847 Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
1848 Self::Created(_) => false,
1849 }
1850 }
1851
1852 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1855 let Self::Initialized(inner) = self else {
1856 unreachable!("IcebergSinkWriter should be initialized before barrier");
1857 };
1858
1859 if !is_checkpoint {
1861 return Ok(None);
1862 }
1863
1864 let close_result = match &mut inner.writer {
1865 IcebergWriterDispatch::Append {
1866 writer,
1867 writer_builder,
1868 } => {
1869 let close_result = match writer.take() {
1870 Some(mut writer) => {
1871 Some(writer.close().instrument_await("iceberg_close").await)
1872 }
1873 _ => None,
1874 };
1875 match writer_builder.build() {
1876 Ok(new_writer) => {
1877 *writer = Some(Box::new(new_writer));
1878 }
1879 _ => {
1880 warn!("Failed to build new writer after close");
1883 }
1884 }
1885 close_result
1886 }
1887 IcebergWriterDispatch::Upsert {
1888 writer,
1889 writer_builder,
1890 ..
1891 } => {
1892 let close_result = match writer.take() {
1893 Some(mut writer) => {
1894 Some(writer.close().instrument_await("iceberg_close").await)
1895 }
1896 _ => None,
1897 };
1898 match writer_builder.build() {
1899 Ok(new_writer) => {
1900 *writer = Some(Box::new(new_writer));
1901 }
1902 _ => {
1903 warn!("Failed to build new writer after close");
1906 }
1907 }
1908 close_result
1909 }
1910 };
1911
1912 match close_result {
1913 Some(Ok(result)) => {
1914 inner.uncommitted_write_bytes = 0;
1915 let format_version = inner.table.metadata().format_version();
1916 let partition_type = inner.table.metadata().default_partition_type();
1917 let data_files = result
1918 .into_iter()
1919 .map(|f| {
1920 let truncated = truncate_datafile(f);
1922 SerializedDataFile::try_from(truncated, partition_type, format_version)
1923 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1924 })
1925 .collect::<Result<Vec<_>>>()?;
1926 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1927 data_files,
1928 schema_id: inner.table.metadata().current_schema_id(),
1929 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1930 })?))
1931 }
1932 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1933 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1934 }
1935 }
1936}
1937
1938const SCHEMA_ID: &str = "schema_id";
1939const PARTITION_SPEC_ID: &str = "partition_spec_id";
1940const DATA_FILES: &str = "data_files";
1941
1942const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1966 data_file.lower_bounds.retain(|field_id, datum| {
1968 let size = match datum.to_bytes() {
1970 Ok(bytes) => bytes.len(),
1971 Err(_) => 0,
1972 };
1973
1974 if size > MAX_COLUMN_STAT_SIZE {
1975 tracing::debug!(
1976 field_id = field_id,
1977 size = size,
1978 "Truncating large lower_bound statistic"
1979 );
1980 return false;
1981 }
1982 true
1983 });
1984
1985 data_file.upper_bounds.retain(|field_id, datum| {
1987 let size = match datum.to_bytes() {
1989 Ok(bytes) => bytes.len(),
1990 Err(_) => 0,
1991 };
1992
1993 if size > MAX_COLUMN_STAT_SIZE {
1994 tracing::debug!(
1995 field_id = field_id,
1996 size = size,
1997 "Truncating large upper_bound statistic"
1998 );
1999 return false;
2000 }
2001 true
2002 });
2003
2004 data_file
2005}
2006
2007#[derive(Default, Clone)]
2008struct IcebergCommitResult {
2009 schema_id: i32,
2010 partition_spec_id: i32,
2011 data_files: Vec<SerializedDataFile>,
2012}
2013
2014impl IcebergCommitResult {
2015 fn try_from(value: &SinkMetadata) -> Result<Self> {
2016 if let Some(Serialized(v)) = &value.metadata {
2017 let mut values = if let serde_json::Value::Object(v) =
2018 serde_json::from_slice::<serde_json::Value>(&v.metadata)
2019 .context("Can't parse iceberg sink metadata")?
2020 {
2021 v
2022 } else {
2023 bail!("iceberg sink metadata should be an object");
2024 };
2025
2026 let schema_id;
2027 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2028 schema_id = value
2029 .as_u64()
2030 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2031 } else {
2032 bail!("iceberg sink metadata should have schema_id");
2033 }
2034
2035 let partition_spec_id;
2036 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2037 partition_spec_id = value
2038 .as_u64()
2039 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2040 } else {
2041 bail!("iceberg sink metadata should have partition_spec_id");
2042 }
2043
2044 let data_files: Vec<SerializedDataFile>;
2045 if let serde_json::Value::Array(values) = values
2046 .remove(DATA_FILES)
2047 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2048 {
2049 data_files = values
2050 .into_iter()
2051 .map(from_value::<SerializedDataFile>)
2052 .collect::<std::result::Result<_, _>>()
2053 .unwrap();
2054 } else {
2055 bail!("iceberg sink metadata should have data_files object");
2056 }
2057
2058 Ok(Self {
2059 schema_id: schema_id as i32,
2060 partition_spec_id: partition_spec_id as i32,
2061 data_files,
2062 })
2063 } else {
2064 bail!("Can't create iceberg sink write result from empty data!")
2065 }
2066 }
2067
2068 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2069 let mut values = if let serde_json::Value::Object(value) =
2070 serde_json::from_slice::<serde_json::Value>(&value)
2071 .context("Can't parse iceberg sink metadata")?
2072 {
2073 value
2074 } else {
2075 bail!("iceberg sink metadata should be an object");
2076 };
2077
2078 let schema_id;
2079 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2080 schema_id = value
2081 .as_u64()
2082 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2083 } else {
2084 bail!("iceberg sink metadata should have schema_id");
2085 }
2086
2087 let partition_spec_id;
2088 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2089 partition_spec_id = value
2090 .as_u64()
2091 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2092 } else {
2093 bail!("iceberg sink metadata should have partition_spec_id");
2094 }
2095
2096 let data_files: Vec<SerializedDataFile>;
2097 if let serde_json::Value::Array(values) = values
2098 .remove(DATA_FILES)
2099 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2100 {
2101 data_files = values
2102 .into_iter()
2103 .map(from_value::<SerializedDataFile>)
2104 .collect::<std::result::Result<_, _>>()
2105 .unwrap();
2106 } else {
2107 bail!("iceberg sink metadata should have data_files object");
2108 }
2109
2110 Ok(Self {
2111 schema_id: schema_id as i32,
2112 partition_spec_id: partition_spec_id as i32,
2113 data_files,
2114 })
2115 }
2116}
2117
2118impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2119 type Error = SinkError;
2120
2121 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2122 let json_data_files = serde_json::Value::Array(
2123 value
2124 .data_files
2125 .iter()
2126 .map(serde_json::to_value)
2127 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2128 .context("Can't serialize data files to json")?,
2129 );
2130 let json_value = serde_json::Value::Object(
2131 vec![
2132 (
2133 SCHEMA_ID.to_owned(),
2134 serde_json::Value::Number(value.schema_id.into()),
2135 ),
2136 (
2137 PARTITION_SPEC_ID.to_owned(),
2138 serde_json::Value::Number(value.partition_spec_id.into()),
2139 ),
2140 (DATA_FILES.to_owned(), json_data_files),
2141 ]
2142 .into_iter()
2143 .collect(),
2144 );
2145 Ok(SinkMetadata {
2146 metadata: Some(Serialized(SerializedMetadata {
2147 metadata: serde_json::to_vec(&json_value)
2148 .context("Can't serialize iceberg sink metadata")?,
2149 })),
2150 })
2151 }
2152}
2153
2154impl TryFrom<IcebergCommitResult> for Vec<u8> {
2155 type Error = SinkError;
2156
2157 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2158 let json_data_files = serde_json::Value::Array(
2159 value
2160 .data_files
2161 .iter()
2162 .map(serde_json::to_value)
2163 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2164 .context("Can't serialize data files to json")?,
2165 );
2166 let json_value = serde_json::Value::Object(
2167 vec![
2168 (
2169 SCHEMA_ID.to_owned(),
2170 serde_json::Value::Number(value.schema_id.into()),
2171 ),
2172 (
2173 PARTITION_SPEC_ID.to_owned(),
2174 serde_json::Value::Number(value.partition_spec_id.into()),
2175 ),
2176 (DATA_FILES.to_owned(), json_data_files),
2177 ]
2178 .into_iter()
2179 .collect(),
2180 );
2181 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2182 }
2183}
2184pub struct IcebergSinkCommitter {
2185 catalog: Arc<dyn Catalog>,
2186 table: Table,
2187 pub last_commit_epoch: u64,
2188 pub(crate) sink_id: SinkId,
2189 pub(crate) config: IcebergConfig,
2190 pub(crate) param: SinkParam,
2191 commit_retry_num: u32,
2192 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2193}
2194
2195impl IcebergSinkCommitter {
2196 async fn reload_table(
2199 catalog: &dyn Catalog,
2200 table_ident: &TableIdent,
2201 schema_id: i32,
2202 partition_spec_id: i32,
2203 ) -> Result<Table> {
2204 let table = catalog
2205 .load_table(table_ident)
2206 .await
2207 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2208 if table.metadata().current_schema_id() != schema_id {
2209 return Err(SinkError::Iceberg(anyhow!(
2210 "Schema evolution not supported, expect schema id {}, but got {}",
2211 schema_id,
2212 table.metadata().current_schema_id()
2213 )));
2214 }
2215 if table.metadata().default_partition_spec_id() != partition_spec_id {
2216 return Err(SinkError::Iceberg(anyhow!(
2217 "Partition evolution not supported, expect partition spec id {}, but got {}",
2218 partition_spec_id,
2219 table.metadata().default_partition_spec_id()
2220 )));
2221 }
2222 Ok(table)
2223 }
2224}
2225
2226#[async_trait]
2227impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2228 async fn init(&mut self) -> Result<()> {
2229 tracing::info!(
2230 sink_id = %self.param.sink_id,
2231 "Iceberg sink coordinator initialized",
2232 );
2233
2234 Ok(())
2235 }
2236
2237 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2238 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2239
2240 if metadata.is_empty() {
2241 tracing::debug!(?epoch, "No datafile to commit");
2242 return Ok(());
2243 }
2244
2245 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2247 self.commit_data_impl(epoch, write_results, snapshot_id)
2248 .await?;
2249 }
2250
2251 Ok(())
2252 }
2253
2254 async fn commit_schema_change(
2255 &mut self,
2256 epoch: u64,
2257 schema_change: PbSinkSchemaChange,
2258 ) -> Result<()> {
2259 tracing::info!(
2260 "Committing schema change {:?} in epoch {}",
2261 schema_change,
2262 epoch
2263 );
2264 self.commit_schema_change_impl(schema_change).await?;
2265 tracing::info!("Successfully committed schema change in epoch {}", epoch);
2266
2267 Ok(())
2268 }
2269}
2270
2271#[async_trait]
2272impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2273 async fn init(&mut self) -> Result<()> {
2274 tracing::info!(
2275 sink_id = %self.param.sink_id,
2276 "Iceberg sink coordinator initialized",
2277 );
2278
2279 Ok(())
2280 }
2281
2282 async fn pre_commit(
2283 &mut self,
2284 epoch: u64,
2285 metadata: Vec<SinkMetadata>,
2286 _schema_change: Option<PbSinkSchemaChange>,
2287 ) -> Result<Option<Vec<u8>>> {
2288 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2289
2290 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2291 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2292 None => {
2293 tracing::debug!(?epoch, "no data to pre commit");
2294 return Ok(None);
2295 }
2296 };
2297
2298 let mut write_results_bytes = Vec::new();
2299 for each_parallelism_write_result in write_results {
2300 let each_parallelism_write_result_bytes: Vec<u8> =
2301 each_parallelism_write_result.try_into()?;
2302 write_results_bytes.push(each_parallelism_write_result_bytes);
2303 }
2304
2305 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2306 write_results_bytes.push(snapshot_id_bytes);
2307
2308 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2309 Ok(Some(pre_commit_metadata_bytes))
2310 }
2311
2312 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2313 tracing::debug!("Starting iceberg commit in epoch {epoch}");
2314
2315 if commit_metadata.is_empty() {
2316 tracing::debug!(?epoch, "No datafile to commit");
2317 return Ok(());
2318 }
2319
2320 let mut payload = deserialize_metadata(commit_metadata);
2322 if payload.is_empty() {
2323 return Err(SinkError::Iceberg(anyhow!(
2324 "Invalid commit metadata: empty payload"
2325 )));
2326 }
2327
2328 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2330 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2331 })?;
2332 let snapshot_id = i64::from_le_bytes(
2333 snapshot_id_bytes
2334 .try_into()
2335 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2336 );
2337
2338 let write_results = payload
2340 .into_iter()
2341 .map(IcebergCommitResult::try_from_serialized_bytes)
2342 .collect::<Result<Vec<_>>>()?;
2343
2344 let snapshot_committed = self
2345 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2346 .await?;
2347
2348 if snapshot_committed {
2349 tracing::info!(
2350 "Snapshot id {} already committed in iceberg table, skip committing again.",
2351 snapshot_id
2352 );
2353 return Ok(());
2354 }
2355
2356 self.commit_data_impl(epoch, write_results, snapshot_id)
2357 .await
2358 }
2359
2360 async fn commit_schema_change(
2361 &mut self,
2362 epoch: u64,
2363 schema_change: PbSinkSchemaChange,
2364 ) -> Result<()> {
2365 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2366 if schema_updated {
2367 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2368 return Ok(());
2369 }
2370
2371 tracing::info!(
2372 "Committing schema change {:?} in epoch {}",
2373 schema_change,
2374 epoch
2375 );
2376 self.commit_schema_change_impl(schema_change).await?;
2377 tracing::info!("Successfully committed schema change in epoch {epoch}");
2378
2379 Ok(())
2380 }
2381
2382 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2383 tracing::debug!("Abort not implemented yet");
2385 }
2386}
2387
2388impl IcebergSinkCommitter {
2390 fn pre_commit_inner(
2391 &mut self,
2392 _epoch: u64,
2393 metadata: Vec<SinkMetadata>,
2394 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2395 let write_results: Vec<IcebergCommitResult> = metadata
2396 .iter()
2397 .map(IcebergCommitResult::try_from)
2398 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2399
2400 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2402 return Ok(None);
2403 }
2404
2405 let expect_schema_id = write_results[0].schema_id;
2406 let expect_partition_spec_id = write_results[0].partition_spec_id;
2407
2408 if write_results
2410 .iter()
2411 .any(|r| r.schema_id != expect_schema_id)
2412 || write_results
2413 .iter()
2414 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2415 {
2416 return Err(SinkError::Iceberg(anyhow!(
2417 "schema_id and partition_spec_id should be the same in all write results"
2418 )));
2419 }
2420
2421 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2422
2423 Ok(Some((write_results, snapshot_id)))
2424 }
2425
2426 async fn commit_data_impl(
2427 &mut self,
2428 epoch: u64,
2429 write_results: Vec<IcebergCommitResult>,
2430 snapshot_id: i64,
2431 ) -> Result<()> {
2432 assert!(
2434 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2435 );
2436
2437 self.wait_for_snapshot_limit().await?;
2439
2440 let expect_schema_id = write_results[0].schema_id;
2441 let expect_partition_spec_id = write_results[0].partition_spec_id;
2442
2443 self.table = Self::reload_table(
2445 self.catalog.as_ref(),
2446 self.table.identifier(),
2447 expect_schema_id,
2448 expect_partition_spec_id,
2449 )
2450 .await?;
2451
2452 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2453 return Err(SinkError::Iceberg(anyhow!(
2454 "Can't find schema by id {}",
2455 expect_schema_id
2456 )));
2457 };
2458 let Some(partition_spec) = self
2459 .table
2460 .metadata()
2461 .partition_spec_by_id(expect_partition_spec_id)
2462 else {
2463 return Err(SinkError::Iceberg(anyhow!(
2464 "Can't find partition spec by id {}",
2465 expect_partition_spec_id
2466 )));
2467 };
2468 let partition_type = partition_spec
2469 .as_ref()
2470 .clone()
2471 .partition_type(schema)
2472 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2473
2474 let data_files = write_results
2475 .into_iter()
2476 .flat_map(|r| {
2477 r.data_files.into_iter().map(|f| {
2478 f.try_into(expect_partition_spec_id, &partition_type, schema)
2479 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2480 })
2481 })
2482 .collect::<Result<Vec<DataFile>>>()?;
2483 let retry_strategy = ExponentialBackoff::from_millis(10)
2488 .max_delay(Duration::from_secs(60))
2489 .map(jitter)
2490 .take(self.commit_retry_num as usize);
2491 let catalog = self.catalog.clone();
2492 let table_ident = self.table.identifier().clone();
2493
2494 enum CommitError {
2499 ReloadTable(SinkError), Commit(SinkError), }
2502
2503 let table = RetryIf::spawn(
2504 retry_strategy,
2505 || async {
2506 let table = Self::reload_table(
2508 catalog.as_ref(),
2509 &table_ident,
2510 expect_schema_id,
2511 expect_partition_spec_id,
2512 )
2513 .await
2514 .map_err(|e| {
2515 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2516 CommitError::ReloadTable(e)
2517 })?;
2518
2519 let txn = Transaction::new(&table);
2520 let append_action = txn
2521 .fast_append()
2522 .set_snapshot_id(snapshot_id)
2523 .set_target_branch(commit_branch(
2524 self.config.r#type.as_str(),
2525 self.config.write_mode,
2526 ))
2527 .add_data_files(data_files.clone());
2528
2529 let tx = append_action.apply(txn).map_err(|err| {
2530 let err: IcebergError = err.into();
2531 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2532 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2533 })?;
2534
2535 tx.commit(catalog.as_ref()).await.map_err(|err| {
2536 let err: IcebergError = err.into();
2537 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2538 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2539 })
2540 },
2541 |err: &CommitError| {
2542 match err {
2544 CommitError::Commit(_) => {
2545 tracing::warn!("Commit failed, will retry");
2546 true
2547 }
2548 CommitError::ReloadTable(_) => {
2549 tracing::error!(
2550 "reload_table failed with non-retriable error, will not retry"
2551 );
2552 false
2553 }
2554 }
2555 },
2556 )
2557 .await
2558 .map_err(|e| match e {
2559 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2560 })?;
2561 self.table = table;
2562
2563 let snapshot_num = self.table.metadata().snapshots().count();
2564 let catalog_name = self.config.common.catalog_name();
2565 let table_name = self.table.identifier().to_string();
2566 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2567 GLOBAL_SINK_METRICS
2568 .iceberg_snapshot_num
2569 .with_guarded_label_values(&metrics_labels)
2570 .set(snapshot_num as i64);
2571
2572 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2573
2574 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2575 && self.config.enable_compaction
2576 && iceberg_compact_stat_sender
2577 .send(IcebergSinkCompactionUpdate {
2578 sink_id: self.sink_id,
2579 compaction_interval: self.config.compaction_interval_sec(),
2580 force_compaction: false,
2581 })
2582 .is_err()
2583 {
2584 warn!("failed to send iceberg compaction stats");
2585 }
2586
2587 Ok(())
2588 }
2589
2590 async fn is_snapshot_id_in_iceberg(
2594 &self,
2595 iceberg_config: &IcebergConfig,
2596 snapshot_id: i64,
2597 ) -> Result<bool> {
2598 let table = iceberg_config.load_table().await?;
2599 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2600 Ok(true)
2601 } else {
2602 Ok(false)
2603 }
2604 }
2605
2606 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2609 let current_schema = self.table.metadata().current_schema();
2610 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2611 .context("Failed to convert schema")
2612 .map_err(SinkError::Iceberg)?;
2613
2614 let iceberg_arrow_convert = IcebergArrowConvert;
2615
2616 let schema_matches = |expected: &[ArrowField]| {
2617 if current_arrow_schema.fields().len() != expected.len() {
2618 return false;
2619 }
2620
2621 expected.iter().all(|expected_field| {
2622 current_arrow_schema.fields().iter().any(|current_field| {
2623 current_field.name() == expected_field.name()
2624 && current_field.data_type() == expected_field.data_type()
2625 })
2626 })
2627 };
2628
2629 let original_arrow_fields: Vec<ArrowField> = schema_change
2630 .original_schema
2631 .iter()
2632 .map(|pb_field| {
2633 let field = Field::from(pb_field);
2634 iceberg_arrow_convert
2635 .to_arrow_field(&field.name, &field.data_type)
2636 .context("Failed to convert field to arrow")
2637 .map_err(SinkError::Iceberg)
2638 })
2639 .collect::<Result<_>>()?;
2640
2641 if schema_matches(&original_arrow_fields) {
2643 tracing::debug!(
2644 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2645 original_arrow_fields.len()
2646 );
2647 return Ok(false);
2648 }
2649
2650 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2652 schema_change.op.as_ref()
2653 else {
2654 return Err(SinkError::Iceberg(anyhow!(
2655 "Unsupported sink schema change op in iceberg sink: {:?}",
2656 schema_change.op
2657 )));
2658 };
2659
2660 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2661 .fields
2662 .iter()
2663 .map(|pb_field| {
2664 let field = Field::from(pb_field);
2665 iceberg_arrow_convert
2666 .to_arrow_field(&field.name, &field.data_type)
2667 .context("Failed to convert field to arrow")
2668 .map_err(SinkError::Iceberg)
2669 })
2670 .collect::<Result<_>>()?;
2671
2672 let mut expected_after_change = original_arrow_fields;
2673 expected_after_change.extend(add_arrow_fields);
2674
2675 if schema_matches(&expected_after_change) {
2677 tracing::debug!(
2678 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2679 expected_after_change.len()
2680 );
2681 return Ok(true);
2682 }
2683
2684 Err(SinkError::Iceberg(anyhow!(
2685 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2686 schema_change.original_schema.len()
2687 )))
2688 }
2689
2690 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2694 use iceberg::spec::NestedField;
2695
2696 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2697 schema_change.op.as_ref()
2698 else {
2699 return Err(SinkError::Iceberg(anyhow!(
2700 "Unsupported sink schema change op in iceberg sink: {:?}",
2701 schema_change.op
2702 )));
2703 };
2704
2705 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2706
2707 let metadata = self.table.metadata();
2709 let mut next_field_id = metadata.last_column_id() + 1;
2710 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2711
2712 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2714 let mut new_fields = Vec::new();
2715
2716 for field in &add_columns {
2717 let arrow_field = iceberg_create_table_arrow_convert
2719 .to_arrow_field(&field.name, &field.data_type)
2720 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2721 .map_err(SinkError::Iceberg)?;
2722
2723 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2725 .map_err(|err| {
2726 SinkError::Iceberg(
2727 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2728 )
2729 })?;
2730
2731 let nested_field = Arc::new(NestedField::optional(
2733 next_field_id,
2734 &field.name,
2735 iceberg_type,
2736 ));
2737
2738 new_fields.push(nested_field);
2739 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2740 next_field_id += 1;
2741 }
2742
2743 tracing::info!(
2745 "Committing schema change to catalog for table {}",
2746 self.table.identifier()
2747 );
2748
2749 let txn = Transaction::new(&self.table);
2750 let action = txn.update_schema().add_fields(new_fields);
2751
2752 let updated_table = action
2753 .apply(txn)
2754 .context("Failed to apply schema update action")
2755 .map_err(SinkError::Iceberg)?
2756 .commit(self.catalog.as_ref())
2757 .await
2758 .context("Failed to commit table schema change")
2759 .map_err(SinkError::Iceberg)?;
2760
2761 self.table = updated_table;
2762
2763 tracing::info!(
2764 "Successfully committed schema change, added {} columns to iceberg table",
2765 add_columns.len()
2766 );
2767
2768 Ok(())
2769 }
2770
2771 fn count_snapshots_since_rewrite(&self) -> usize {
2774 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2775 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2776
2777 let mut count = 0;
2779 for snapshot in snapshots {
2780 let summary = snapshot.summary();
2782 match &summary.operation {
2783 Operation::Replace => {
2784 break;
2786 }
2787
2788 _ => {
2789 count += 1;
2791 }
2792 }
2793 }
2794
2795 count
2796 }
2797
2798 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2800 if !self.config.enable_compaction {
2801 return Ok(());
2802 }
2803
2804 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2805 loop {
2806 let current_count = self.count_snapshots_since_rewrite();
2807
2808 if current_count < max_snapshots {
2809 tracing::info!(
2810 "Snapshot count check passed: {} < {}",
2811 current_count,
2812 max_snapshots
2813 );
2814 break;
2815 }
2816
2817 tracing::info!(
2818 "Snapshot count {} exceeds limit {}, waiting...",
2819 current_count,
2820 max_snapshots
2821 );
2822
2823 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2824 && iceberg_compact_stat_sender
2825 .send(IcebergSinkCompactionUpdate {
2826 sink_id: self.sink_id,
2827 compaction_interval: self.config.compaction_interval_sec(),
2828 force_compaction: true,
2829 })
2830 .is_err()
2831 {
2832 tracing::warn!("failed to send iceberg compaction stats");
2833 }
2834
2835 tokio::time::sleep(Duration::from_secs(30)).await;
2837
2838 self.table = self.config.load_table().await?;
2840 }
2841 }
2842 Ok(())
2843 }
2844}
2845
2846const MAP_KEY: &str = "key";
2847const MAP_VALUE: &str = "value";
2848
2849fn get_fields<'a>(
2850 our_field_type: &'a risingwave_common::types::DataType,
2851 data_type: &ArrowDataType,
2852 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2853) -> Option<ArrowFields> {
2854 match data_type {
2855 ArrowDataType::Struct(fields) => {
2856 match our_field_type {
2857 risingwave_common::types::DataType::Struct(struct_fields) => {
2858 struct_fields.iter().for_each(|(name, data_type)| {
2859 let res = schema_fields.insert(name, data_type);
2860 assert!(res.is_none())
2862 });
2863 }
2864 risingwave_common::types::DataType::Map(map_fields) => {
2865 schema_fields.insert(MAP_KEY, map_fields.key());
2866 schema_fields.insert(MAP_VALUE, map_fields.value());
2867 }
2868 risingwave_common::types::DataType::List(list) => {
2869 list.elem()
2870 .as_struct()
2871 .iter()
2872 .for_each(|(name, data_type)| {
2873 let res = schema_fields.insert(name, data_type);
2874 assert!(res.is_none())
2876 });
2877 }
2878 _ => {}
2879 };
2880 Some(fields.clone())
2881 }
2882 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2883 get_fields(our_field_type, field.data_type(), schema_fields)
2884 }
2885 _ => None, }
2887}
2888
2889fn check_compatibility(
2890 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2891 fields: &ArrowFields,
2892) -> anyhow::Result<bool> {
2893 for arrow_field in fields {
2894 let our_field_type = schema_fields
2895 .get(arrow_field.name().as_str())
2896 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2897
2898 let converted_arrow_data_type = IcebergArrowConvert
2900 .to_arrow_field("", our_field_type)
2901 .map_err(|e| anyhow!(e))?
2902 .data_type()
2903 .clone();
2904
2905 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2906 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2907 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2908 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2909 (ArrowDataType::List(_), ArrowDataType::List(field))
2910 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2911 let mut schema_fields = HashMap::new();
2912 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2913 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2914 }
2915 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2917 let mut schema_fields = HashMap::new();
2918 our_field_type
2919 .as_struct()
2920 .iter()
2921 .for_each(|(name, data_type)| {
2922 let res = schema_fields.insert(name, data_type);
2923 assert!(res.is_none())
2925 });
2926 check_compatibility(schema_fields, fields)?
2927 }
2928 (left, right) => left.equals_datatype(right),
2936 };
2937 if !compatible {
2938 bail!(
2939 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2940 arrow_field.name(),
2941 converted_arrow_data_type,
2942 arrow_field.data_type()
2943 );
2944 }
2945 }
2946 Ok(true)
2947}
2948
2949pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2951 if rw_schema.fields.len() != arrow_schema.fields().len() {
2952 bail!(
2953 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2954 rw_schema.fields.len(),
2955 arrow_schema.fields.len()
2956 );
2957 }
2958
2959 let mut schema_fields = HashMap::new();
2960 rw_schema.fields.iter().for_each(|field| {
2961 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2962 assert!(res.is_none())
2964 });
2965
2966 check_compatibility(schema_fields, &arrow_schema.fields)?;
2967 Ok(())
2968}
2969
2970fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2971 serde_json::to_vec(&metadata).unwrap()
2972}
2973
2974fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2975 serde_json::from_slice(&bytes).unwrap()
2976}
2977
2978pub fn parse_partition_by_exprs(
2979 expr: String,
2980) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2981 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2983 if !re.is_match(&expr) {
2984 bail!(format!(
2985 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2986 expr
2987 ))
2988 }
2989 let caps = re.captures_iter(&expr);
2990
2991 let mut partition_columns = vec![];
2992
2993 for mat in caps {
2994 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2995 (&mat["transform"], Transform::Identity)
2996 } else {
2997 let mut func = mat["transform"].to_owned();
2998 if func == "bucket" || func == "truncate" {
2999 let n = &mat
3000 .name("n")
3001 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
3002 .as_str();
3003 func = format!("{func}[{n}]");
3004 }
3005 (
3006 &mat["field"],
3007 Transform::from_str(&func)
3008 .with_context(|| format!("invalid transform function {}", func))?,
3009 )
3010 };
3011 partition_columns.push((column.to_owned(), transform));
3012 }
3013 Ok(partition_columns)
3014}
3015
3016pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
3017 if should_enable_iceberg_cow(sink_type, write_mode) {
3018 ICEBERG_COW_BRANCH.to_owned()
3019 } else {
3020 MAIN_BRANCH.to_owned()
3021 }
3022}
3023
3024pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
3025 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
3026}
3027
3028impl crate::with_options::WithOptions for IcebergWriteMode {}
3029
3030impl crate::with_options::WithOptions for FormatVersion {}
3031
3032impl crate::with_options::WithOptions for CompactionType {}
3033
3034#[cfg(test)]
3035mod test {
3036 use std::collections::BTreeMap;
3037
3038 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
3039 use risingwave_common::types::{DataType, MapType, StructType};
3040
3041 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
3042 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
3043 use crate::sink::iceberg::{
3044 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
3045 CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FormatVersion,
3046 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, IcebergConfig, IcebergWriteMode,
3047 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3048 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3049 };
3050
3051 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
3054 fn test_compatible_arrow_schema() {
3055 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3056
3057 use super::*;
3058 let risingwave_schema = Schema::new(vec![
3059 Field::with_name(DataType::Int32, "a"),
3060 Field::with_name(DataType::Int32, "b"),
3061 Field::with_name(DataType::Int32, "c"),
3062 ]);
3063 let arrow_schema = ArrowSchema::new(vec![
3064 ArrowField::new("a", ArrowDataType::Int32, false),
3065 ArrowField::new("b", ArrowDataType::Int32, false),
3066 ArrowField::new("c", ArrowDataType::Int32, false),
3067 ]);
3068
3069 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3070
3071 let risingwave_schema = Schema::new(vec![
3072 Field::with_name(DataType::Int32, "d"),
3073 Field::with_name(DataType::Int32, "c"),
3074 Field::with_name(DataType::Int32, "a"),
3075 Field::with_name(DataType::Int32, "b"),
3076 ]);
3077 let arrow_schema = ArrowSchema::new(vec![
3078 ArrowField::new("a", ArrowDataType::Int32, false),
3079 ArrowField::new("b", ArrowDataType::Int32, false),
3080 ArrowField::new("d", ArrowDataType::Int32, false),
3081 ArrowField::new("c", ArrowDataType::Int32, false),
3082 ]);
3083 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3084
3085 let risingwave_schema = Schema::new(vec![
3086 Field::with_name(
3087 DataType::Struct(StructType::new(vec![
3088 ("a1", DataType::Int32),
3089 (
3090 "a2",
3091 DataType::Struct(StructType::new(vec![
3092 ("a21", DataType::Bytea),
3093 (
3094 "a22",
3095 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3096 ),
3097 ])),
3098 ),
3099 ])),
3100 "a",
3101 ),
3102 Field::with_name(
3103 DataType::list(DataType::Struct(StructType::new(vec![
3104 ("b1", DataType::Int32),
3105 ("b2", DataType::Bytea),
3106 (
3107 "b3",
3108 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3109 ),
3110 ]))),
3111 "b",
3112 ),
3113 Field::with_name(
3114 DataType::Map(MapType::from_kv(
3115 DataType::Varchar,
3116 DataType::list(DataType::Struct(StructType::new([
3117 ("c1", DataType::Int32),
3118 ("c2", DataType::Bytea),
3119 (
3120 "c3",
3121 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3122 ),
3123 ]))),
3124 )),
3125 "c",
3126 ),
3127 ]);
3128 let arrow_schema = ArrowSchema::new(vec![
3129 ArrowField::new(
3130 "a",
3131 ArrowDataType::Struct(ArrowFields::from(vec![
3132 ArrowField::new("a1", ArrowDataType::Int32, false),
3133 ArrowField::new(
3134 "a2",
3135 ArrowDataType::Struct(ArrowFields::from(vec![
3136 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3137 ArrowField::new_map(
3138 "a22",
3139 "entries",
3140 ArrowFieldRef::new(ArrowField::new(
3141 "key",
3142 ArrowDataType::Utf8,
3143 false,
3144 )),
3145 ArrowFieldRef::new(ArrowField::new(
3146 "value",
3147 ArrowDataType::Utf8,
3148 false,
3149 )),
3150 false,
3151 false,
3152 ),
3153 ])),
3154 false,
3155 ),
3156 ])),
3157 false,
3158 ),
3159 ArrowField::new(
3160 "b",
3161 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3162 ArrowDataType::Struct(ArrowFields::from(vec![
3163 ArrowField::new("b1", ArrowDataType::Int32, false),
3164 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3165 ArrowField::new_map(
3166 "b3",
3167 "entries",
3168 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3169 ArrowFieldRef::new(ArrowField::new(
3170 "value",
3171 ArrowDataType::Utf8,
3172 false,
3173 )),
3174 false,
3175 false,
3176 ),
3177 ])),
3178 false,
3179 ))),
3180 false,
3181 ),
3182 ArrowField::new_map(
3183 "c",
3184 "entries",
3185 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3186 ArrowFieldRef::new(ArrowField::new(
3187 "value",
3188 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3189 ArrowDataType::Struct(ArrowFields::from(vec![
3190 ArrowField::new("c1", ArrowDataType::Int32, false),
3191 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3192 ArrowField::new_map(
3193 "c3",
3194 "entries",
3195 ArrowFieldRef::new(ArrowField::new(
3196 "key",
3197 ArrowDataType::Utf8,
3198 false,
3199 )),
3200 ArrowFieldRef::new(ArrowField::new(
3201 "value",
3202 ArrowDataType::Utf8,
3203 false,
3204 )),
3205 false,
3206 false,
3207 ),
3208 ])),
3209 false,
3210 ))),
3211 false,
3212 )),
3213 false,
3214 false,
3215 ),
3216 ]);
3217 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3218 }
3219
3220 #[test]
3221 fn test_parse_iceberg_config() {
3222 let values = [
3223 ("connector", "iceberg"),
3224 ("type", "upsert"),
3225 ("primary_key", "v1"),
3226 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3227 ("warehouse.path", "s3://iceberg"),
3228 ("s3.endpoint", "http://127.0.0.1:9301"),
3229 ("s3.access.key", "hummockadmin"),
3230 ("s3.secret.key", "hummockadmin"),
3231 ("s3.path.style.access", "true"),
3232 ("s3.region", "us-east-1"),
3233 ("catalog.type", "jdbc"),
3234 ("catalog.name", "demo"),
3235 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3236 ("catalog.jdbc.user", "admin"),
3237 ("catalog.jdbc.password", "123456"),
3238 ("database.name", "demo_db"),
3239 ("table.name", "demo_table"),
3240 ("enable_compaction", "true"),
3241 ("compaction_interval_sec", "1800"),
3242 ("enable_snapshot_expiration", "true"),
3243 ]
3244 .into_iter()
3245 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3246 .collect();
3247
3248 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3249
3250 let expected_iceberg_config = IcebergConfig {
3251 common: IcebergCommon {
3252 warehouse_path: Some("s3://iceberg".to_owned()),
3253 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3254 s3_region: Some("us-east-1".to_owned()),
3255 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3256 s3_access_key: Some("hummockadmin".to_owned()),
3257 s3_secret_key: Some("hummockadmin".to_owned()),
3258 s3_iam_role_arn: None,
3259 gcs_credential: None,
3260 catalog_type: Some("jdbc".to_owned()),
3261 glue_id: None,
3262 glue_region: None,
3263 glue_access_key: None,
3264 glue_secret_key: None,
3265 glue_iam_role_arn: None,
3266 catalog_name: Some("demo".to_owned()),
3267 s3_path_style_access: Some(true),
3268 catalog_credential: None,
3269 catalog_oauth2_server_uri: None,
3270 catalog_scope: None,
3271 catalog_token: None,
3272 enable_config_load: None,
3273 rest_signing_name: None,
3274 rest_signing_region: None,
3275 rest_sigv4_enabled: None,
3276 hosted_catalog: None,
3277 azblob_account_name: None,
3278 azblob_account_key: None,
3279 azblob_endpoint_url: None,
3280 catalog_header: None,
3281 adlsgen2_account_name: None,
3282 adlsgen2_account_key: None,
3283 adlsgen2_endpoint: None,
3284 vended_credentials: None,
3285 catalog_security: None,
3286 gcp_auth_scopes: None,
3287 catalog_io_impl: None,
3288 },
3289 table: IcebergTableIdentifier {
3290 database_name: Some("demo_db".to_owned()),
3291 table_name: "demo_table".to_owned(),
3292 },
3293 r#type: "upsert".to_owned(),
3294 force_append_only: false,
3295 primary_key: Some(vec!["v1".to_owned()]),
3296 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3297 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3298 .into_iter()
3299 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3300 .collect(),
3301 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3302 commit_checkpoint_size_threshold_mb: Some(
3303 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3304 ),
3305 create_table_if_not_exists: false,
3306 is_exactly_once: Some(true),
3307 commit_retry_num: 8,
3308 enable_compaction: true,
3309 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3310 enable_snapshot_expiration: true,
3311 write_mode: IcebergWriteMode::MergeOnRead,
3312 format_version: FormatVersion::V2,
3313 snapshot_expiration_max_age_millis: None,
3314 snapshot_expiration_retain_last: None,
3315 snapshot_expiration_clear_expired_files: true,
3316 snapshot_expiration_clear_expired_meta_data: true,
3317 max_snapshots_num_before_compaction: None,
3318 small_files_threshold_mb: None,
3319 delete_files_count_threshold: None,
3320 trigger_snapshot_count: None,
3321 target_file_size_mb: None,
3322 compaction_type: None,
3323 write_parquet_compression: None,
3324 write_parquet_max_row_group_rows: None,
3325 };
3326
3327 assert_eq!(iceberg_config, expected_iceberg_config);
3328
3329 assert_eq!(
3330 &iceberg_config.full_table_name().unwrap().to_string(),
3331 "demo_db.demo_table"
3332 );
3333 }
3334
3335 #[test]
3336 fn test_parse_commit_checkpoint_size_threshold() {
3337 let values: BTreeMap<String, String> = [
3338 ("connector", "iceberg"),
3339 ("type", "append-only"),
3340 ("force_append_only", "true"),
3341 ("catalog.name", "test-catalog"),
3342 ("catalog.type", "storage"),
3343 ("warehouse.path", "s3://my-bucket/warehouse"),
3344 ("database.name", "test_db"),
3345 ("table.name", "test_table"),
3346 ("commit_checkpoint_size_threshold_mb", "128"),
3347 ]
3348 .into_iter()
3349 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3350 .collect();
3351
3352 let config = IcebergConfig::from_btreemap(values).unwrap();
3353 assert_eq!(config.commit_checkpoint_size_threshold_mb, Some(128));
3354 assert_eq!(
3355 config.commit_checkpoint_size_threshold_bytes(),
3356 Some(128 * 1024 * 1024)
3357 );
3358 }
3359
3360 #[test]
3361 fn test_default_commit_checkpoint_size_threshold() {
3362 let values: BTreeMap<String, String> = [
3363 ("connector", "iceberg"),
3364 ("type", "append-only"),
3365 ("force_append_only", "true"),
3366 ("catalog.name", "test-catalog"),
3367 ("catalog.type", "storage"),
3368 ("warehouse.path", "s3://my-bucket/warehouse"),
3369 ("database.name", "test_db"),
3370 ("table.name", "test_table"),
3371 ]
3372 .into_iter()
3373 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3374 .collect();
3375
3376 let config = IcebergConfig::from_btreemap(values).unwrap();
3377 assert_eq!(
3378 config.commit_checkpoint_size_threshold_mb,
3379 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
3380 );
3381 assert_eq!(
3382 config.commit_checkpoint_size_threshold_bytes(),
3383 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB * 1024 * 1024)
3384 );
3385 }
3386
3387 #[test]
3388 fn test_reject_zero_commit_checkpoint_size_threshold() {
3389 let values: BTreeMap<String, String> = [
3390 ("connector", "iceberg"),
3391 ("type", "append-only"),
3392 ("force_append_only", "true"),
3393 ("catalog.name", "test-catalog"),
3394 ("catalog.type", "storage"),
3395 ("warehouse.path", "s3://my-bucket/warehouse"),
3396 ("database.name", "test_db"),
3397 ("table.name", "test_table"),
3398 ("commit_checkpoint_size_threshold_mb", "0"),
3399 ]
3400 .into_iter()
3401 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3402 .collect();
3403
3404 let err = IcebergConfig::from_btreemap(values).unwrap_err();
3405 assert!(
3406 err.to_string()
3407 .contains("`commit_checkpoint_size_threshold_mb` must be greater than 0")
3408 );
3409 }
3410
3411 async fn test_create_catalog(configs: BTreeMap<String, String>) {
3412 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3413
3414 let _table = iceberg_config.load_table().await.unwrap();
3415 }
3416
3417 #[tokio::test]
3418 #[ignore]
3419 async fn test_storage_catalog() {
3420 let values = [
3421 ("connector", "iceberg"),
3422 ("type", "append-only"),
3423 ("force_append_only", "true"),
3424 ("s3.endpoint", "http://127.0.0.1:9301"),
3425 ("s3.access.key", "hummockadmin"),
3426 ("s3.secret.key", "hummockadmin"),
3427 ("s3.region", "us-east-1"),
3428 ("s3.path.style.access", "true"),
3429 ("catalog.name", "demo"),
3430 ("catalog.type", "storage"),
3431 ("warehouse.path", "s3://icebergdata/demo"),
3432 ("database.name", "s1"),
3433 ("table.name", "t1"),
3434 ]
3435 .into_iter()
3436 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3437 .collect();
3438
3439 test_create_catalog(values).await;
3440 }
3441
3442 #[tokio::test]
3443 #[ignore]
3444 async fn test_rest_catalog() {
3445 let values = [
3446 ("connector", "iceberg"),
3447 ("type", "append-only"),
3448 ("force_append_only", "true"),
3449 ("s3.endpoint", "http://127.0.0.1:9301"),
3450 ("s3.access.key", "hummockadmin"),
3451 ("s3.secret.key", "hummockadmin"),
3452 ("s3.region", "us-east-1"),
3453 ("s3.path.style.access", "true"),
3454 ("catalog.name", "demo"),
3455 ("catalog.type", "rest"),
3456 ("catalog.uri", "http://192.168.167.4:8181"),
3457 ("warehouse.path", "s3://icebergdata/demo"),
3458 ("database.name", "s1"),
3459 ("table.name", "t1"),
3460 ]
3461 .into_iter()
3462 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3463 .collect();
3464
3465 test_create_catalog(values).await;
3466 }
3467
3468 #[tokio::test]
3469 #[ignore]
3470 async fn test_jdbc_catalog() {
3471 let values = [
3472 ("connector", "iceberg"),
3473 ("type", "append-only"),
3474 ("force_append_only", "true"),
3475 ("s3.endpoint", "http://127.0.0.1:9301"),
3476 ("s3.access.key", "hummockadmin"),
3477 ("s3.secret.key", "hummockadmin"),
3478 ("s3.region", "us-east-1"),
3479 ("s3.path.style.access", "true"),
3480 ("catalog.name", "demo"),
3481 ("catalog.type", "jdbc"),
3482 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3483 ("catalog.jdbc.user", "admin"),
3484 ("catalog.jdbc.password", "123456"),
3485 ("warehouse.path", "s3://icebergdata/demo"),
3486 ("database.name", "s1"),
3487 ("table.name", "t1"),
3488 ]
3489 .into_iter()
3490 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3491 .collect();
3492
3493 test_create_catalog(values).await;
3494 }
3495
3496 #[tokio::test]
3497 #[ignore]
3498 async fn test_hive_catalog() {
3499 let values = [
3500 ("connector", "iceberg"),
3501 ("type", "append-only"),
3502 ("force_append_only", "true"),
3503 ("s3.endpoint", "http://127.0.0.1:9301"),
3504 ("s3.access.key", "hummockadmin"),
3505 ("s3.secret.key", "hummockadmin"),
3506 ("s3.region", "us-east-1"),
3507 ("s3.path.style.access", "true"),
3508 ("catalog.name", "demo"),
3509 ("catalog.type", "hive"),
3510 ("catalog.uri", "thrift://localhost:9083"),
3511 ("warehouse.path", "s3://icebergdata/demo"),
3512 ("database.name", "s1"),
3513 ("table.name", "t1"),
3514 ]
3515 .into_iter()
3516 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3517 .collect();
3518
3519 test_create_catalog(values).await;
3520 }
3521
3522 #[test]
3524 fn test_parse_google_auth_config() {
3525 let values: BTreeMap<String, String> = [
3526 ("connector", "iceberg"),
3527 ("type", "append-only"),
3528 ("force_append_only", "true"),
3529 ("catalog.name", "biglake-catalog"),
3530 ("catalog.type", "rest"),
3531 ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3532 ("warehouse.path", "bq://projects/my-gcp-project"),
3533 ("catalog.header", "x-goog-user-project=my-gcp-project"),
3534 ("catalog.security", "google"),
3535 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3536 ("database.name", "my_dataset"),
3537 ("table.name", "my_table"),
3538 ]
3539 .into_iter()
3540 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3541 .collect();
3542
3543 let config = IcebergConfig::from_btreemap(values).unwrap();
3544 assert_eq!(config.catalog_type(), "rest");
3545 assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3546 assert_eq!(
3547 config.common.gcp_auth_scopes.as_deref(),
3548 Some(
3549 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3550 )
3551 );
3552 assert_eq!(
3553 config.common.warehouse_path.as_deref(),
3554 Some("bq://projects/my-gcp-project")
3555 );
3556 assert_eq!(
3557 config.common.catalog_header.as_deref(),
3558 Some("x-goog-user-project=my-gcp-project")
3559 );
3560 }
3561
3562 #[test]
3564 fn test_parse_oauth2_security_config() {
3565 let values: BTreeMap<String, String> = [
3566 ("connector", "iceberg"),
3567 ("type", "append-only"),
3568 ("force_append_only", "true"),
3569 ("catalog.name", "oauth2-catalog"),
3570 ("catalog.type", "rest"),
3571 ("catalog.uri", "https://example.com/iceberg/rest"),
3572 ("warehouse.path", "s3://my-bucket/warehouse"),
3573 ("catalog.security", "oauth2"),
3574 ("catalog.credential", "client_id:client_secret"),
3575 ("catalog.token", "bearer-token"),
3576 (
3577 "catalog.oauth2_server_uri",
3578 "https://oauth.example.com/token",
3579 ),
3580 ("catalog.scope", "read write"),
3581 ("database.name", "test_db"),
3582 ("table.name", "test_table"),
3583 ]
3584 .into_iter()
3585 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3586 .collect();
3587
3588 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3589
3590 assert_eq!(iceberg_config.catalog_type(), "rest");
3592
3593 assert_eq!(
3595 iceberg_config.common.catalog_security.as_deref(),
3596 Some("oauth2")
3597 );
3598 assert_eq!(
3599 iceberg_config.common.catalog_credential.as_deref(),
3600 Some("client_id:client_secret")
3601 );
3602 assert_eq!(
3603 iceberg_config.common.catalog_token.as_deref(),
3604 Some("bearer-token")
3605 );
3606 assert_eq!(
3607 iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3608 Some("https://oauth.example.com/token")
3609 );
3610 assert_eq!(
3611 iceberg_config.common.catalog_scope.as_deref(),
3612 Some("read write")
3613 );
3614 }
3615
3616 #[test]
3618 fn test_parse_invalid_security_config() {
3619 let values: BTreeMap<String, String> = [
3620 ("connector", "iceberg"),
3621 ("type", "append-only"),
3622 ("force_append_only", "true"),
3623 ("catalog.name", "invalid-catalog"),
3624 ("catalog.type", "rest"),
3625 ("catalog.uri", "https://example.com/iceberg/rest"),
3626 ("warehouse.path", "s3://my-bucket/warehouse"),
3627 ("catalog.security", "invalid_security_type"),
3628 ("database.name", "test_db"),
3629 ("table.name", "test_table"),
3630 ]
3631 .into_iter()
3632 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3633 .collect();
3634
3635 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3637
3638 assert_eq!(
3640 iceberg_config.common.catalog_security.as_deref(),
3641 Some("invalid_security_type")
3642 );
3643
3644 assert_eq!(iceberg_config.catalog_type(), "rest");
3646 }
3647
3648 #[test]
3650 fn test_parse_custom_io_impl_config() {
3651 let values: BTreeMap<String, String> = [
3652 ("connector", "iceberg"),
3653 ("type", "append-only"),
3654 ("force_append_only", "true"),
3655 ("catalog.name", "gcs-catalog"),
3656 ("catalog.type", "rest"),
3657 ("catalog.uri", "https://example.com/iceberg/rest"),
3658 ("warehouse.path", "gs://my-bucket/warehouse"),
3659 ("catalog.security", "google"),
3660 ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3661 ("database.name", "test_db"),
3662 ("table.name", "test_table"),
3663 ]
3664 .into_iter()
3665 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3666 .collect();
3667
3668 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3669
3670 assert_eq!(iceberg_config.catalog_type(), "rest");
3672
3673 assert_eq!(
3675 iceberg_config.common.catalog_io_impl.as_deref(),
3676 Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3677 );
3678
3679 assert_eq!(
3681 iceberg_config.common.catalog_security.as_deref(),
3682 Some("google")
3683 );
3684 }
3685
3686 #[test]
3687 fn test_config_constants_consistency() {
3688 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3691 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3692 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3693 assert_eq!(WRITE_MODE, "write_mode");
3694 assert_eq!(
3695 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3696 "commit_checkpoint_size_threshold_mb"
3697 );
3698 assert_eq!(
3699 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3700 "snapshot_expiration_retain_last"
3701 );
3702 assert_eq!(
3703 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3704 "snapshot_expiration_max_age_millis"
3705 );
3706 assert_eq!(
3707 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3708 "snapshot_expiration_clear_expired_files"
3709 );
3710 assert_eq!(
3711 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3712 "snapshot_expiration_clear_expired_meta_data"
3713 );
3714 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3715 }
3716
3717 #[test]
3719 fn test_parse_compaction_config() {
3720 let values: BTreeMap<String, String> = [
3722 ("connector", "iceberg"),
3723 ("type", "upsert"),
3724 ("primary_key", "id"),
3725 ("warehouse.path", "s3://iceberg"),
3726 ("s3.endpoint", "http://127.0.0.1:9301"),
3727 ("s3.access.key", "test"),
3728 ("s3.secret.key", "test"),
3729 ("s3.region", "us-east-1"),
3730 ("catalog.type", "storage"),
3731 ("catalog.name", "demo"),
3732 ("database.name", "test_db"),
3733 ("table.name", "test_table"),
3734 ("enable_compaction", "true"),
3735 ("compaction.max_snapshots_num", "100"),
3736 ("compaction.small_files_threshold_mb", "512"),
3737 ("compaction.delete_files_count_threshold", "50"),
3738 ("compaction.trigger_snapshot_count", "10"),
3739 ("compaction.target_file_size_mb", "256"),
3740 ("compaction.type", "full"),
3741 ("compaction.write_parquet_compression", "zstd"),
3742 ("compaction.write_parquet_max_row_group_rows", "50000"),
3743 ]
3744 .into_iter()
3745 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3746 .collect();
3747
3748 let config = IcebergConfig::from_btreemap(values).unwrap();
3749 assert!(config.enable_compaction);
3750 assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3751 assert_eq!(config.small_files_threshold_mb, Some(512));
3752 assert_eq!(config.delete_files_count_threshold, Some(50));
3753 assert_eq!(config.trigger_snapshot_count, Some(10));
3754 assert_eq!(config.target_file_size_mb, Some(256));
3755 assert_eq!(config.compaction_type, Some(CompactionType::Full));
3756 assert_eq!(config.target_file_size_mb(), 256);
3757 assert_eq!(config.write_parquet_compression(), "zstd");
3758 assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3759
3760 let values: BTreeMap<String, String> = [
3762 ("connector", "iceberg"),
3763 ("type", "append-only"),
3764 ("force_append_only", "true"),
3765 ("catalog.name", "test-catalog"),
3766 ("catalog.type", "storage"),
3767 ("warehouse.path", "s3://my-bucket/warehouse"),
3768 ("database.name", "test_db"),
3769 ("table.name", "test_table"),
3770 ]
3771 .into_iter()
3772 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3773 .collect();
3774
3775 let config = IcebergConfig::from_btreemap(values).unwrap();
3776 assert_eq!(config.target_file_size_mb(), 1024); assert_eq!(config.write_parquet_compression(), "zstd"); assert_eq!(config.write_parquet_max_row_group_rows(), 122880); }
3780
3781 #[test]
3783 fn test_parse_parquet_compression() {
3784 use parquet::basic::Compression;
3785
3786 use super::parse_parquet_compression;
3787
3788 assert!(matches!(
3790 parse_parquet_compression("snappy"),
3791 Compression::SNAPPY
3792 ));
3793 assert!(matches!(
3794 parse_parquet_compression("gzip"),
3795 Compression::GZIP(_)
3796 ));
3797 assert!(matches!(
3798 parse_parquet_compression("zstd"),
3799 Compression::ZSTD(_)
3800 ));
3801 assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3802 assert!(matches!(
3803 parse_parquet_compression("brotli"),
3804 Compression::BROTLI(_)
3805 ));
3806 assert!(matches!(
3807 parse_parquet_compression("uncompressed"),
3808 Compression::UNCOMPRESSED
3809 ));
3810
3811 assert!(matches!(
3813 parse_parquet_compression("SNAPPY"),
3814 Compression::SNAPPY
3815 ));
3816 assert!(matches!(
3817 parse_parquet_compression("Zstd"),
3818 Compression::ZSTD(_)
3819 ));
3820
3821 assert!(matches!(
3823 parse_parquet_compression("invalid"),
3824 Compression::SNAPPY
3825 ));
3826 }
3827
3828 #[test]
3829 fn test_append_only_rejects_copy_on_write() {
3830 let values = [
3832 ("connector", "iceberg"),
3833 ("type", "append-only"),
3834 ("warehouse.path", "s3://iceberg"),
3835 ("s3.endpoint", "http://127.0.0.1:9301"),
3836 ("s3.access.key", "test"),
3837 ("s3.secret.key", "test"),
3838 ("s3.region", "us-east-1"),
3839 ("catalog.type", "storage"),
3840 ("catalog.name", "demo"),
3841 ("database.name", "test_db"),
3842 ("table.name", "test_table"),
3843 ("write_mode", "copy-on-write"),
3844 ]
3845 .into_iter()
3846 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3847 .collect();
3848
3849 let result = IcebergConfig::from_btreemap(values);
3850 assert!(result.is_err());
3851 assert!(
3852 result
3853 .unwrap_err()
3854 .to_string()
3855 .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3856 );
3857 }
3858
3859 #[test]
3860 fn test_append_only_accepts_merge_on_read() {
3861 let values = [
3863 ("connector", "iceberg"),
3864 ("type", "append-only"),
3865 ("warehouse.path", "s3://iceberg"),
3866 ("s3.endpoint", "http://127.0.0.1:9301"),
3867 ("s3.access.key", "test"),
3868 ("s3.secret.key", "test"),
3869 ("s3.region", "us-east-1"),
3870 ("catalog.type", "storage"),
3871 ("catalog.name", "demo"),
3872 ("database.name", "test_db"),
3873 ("table.name", "test_table"),
3874 ("write_mode", "merge-on-read"),
3875 ]
3876 .into_iter()
3877 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3878 .collect();
3879
3880 let result = IcebergConfig::from_btreemap(values);
3881 assert!(result.is_ok());
3882 let config = result.unwrap();
3883 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3884 }
3885
3886 #[test]
3887 fn test_append_only_defaults_to_merge_on_read() {
3888 let values = [
3890 ("connector", "iceberg"),
3891 ("type", "append-only"),
3892 ("warehouse.path", "s3://iceberg"),
3893 ("s3.endpoint", "http://127.0.0.1:9301"),
3894 ("s3.access.key", "test"),
3895 ("s3.secret.key", "test"),
3896 ("s3.region", "us-east-1"),
3897 ("catalog.type", "storage"),
3898 ("catalog.name", "demo"),
3899 ("database.name", "test_db"),
3900 ("table.name", "test_table"),
3901 ]
3902 .into_iter()
3903 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3904 .collect();
3905
3906 let result = IcebergConfig::from_btreemap(values);
3907 assert!(result.is_ok());
3908 let config = result.unwrap();
3909 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3910 }
3911
3912 #[test]
3913 fn test_upsert_accepts_copy_on_write() {
3914 let values = [
3916 ("connector", "iceberg"),
3917 ("type", "upsert"),
3918 ("primary_key", "id"),
3919 ("warehouse.path", "s3://iceberg"),
3920 ("s3.endpoint", "http://127.0.0.1:9301"),
3921 ("s3.access.key", "test"),
3922 ("s3.secret.key", "test"),
3923 ("s3.region", "us-east-1"),
3924 ("catalog.type", "storage"),
3925 ("catalog.name", "demo"),
3926 ("database.name", "test_db"),
3927 ("table.name", "test_table"),
3928 ("write_mode", "copy-on-write"),
3929 ]
3930 .into_iter()
3931 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3932 .collect();
3933
3934 let result = IcebergConfig::from_btreemap(values);
3935 assert!(result.is_ok());
3936 let config = result.unwrap();
3937 assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3938 }
3939}