1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31 SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32 UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38 DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45 PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50 DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64 Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94 SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103 TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121 #[default]
122 MergeOnRead,
123 CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127 pub fn as_str(self) -> &'static str {
128 match self {
129 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131 }
132 }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136 type Err = SinkError;
137
138 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139 match s {
140 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142 _ => Err(SinkError::Config(anyhow!(format!(
143 "invalid write_mode: {}, must be one of: {}, {}",
144 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145 )))),
146 }
147 }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151 type Error = <Self as std::str::FromStr>::Err;
152
153 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154 value.parse()
155 }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159 type Error = <Self as std::str::FromStr>::Err;
160
161 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162 value.as_str().parse()
163 }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.write_str(self.as_str())
169 }
170}
171
172pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182 "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197 "compaction.write_parquet_max_row_group_rows";
198
199const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
200
201fn default_commit_retry_num() -> u32 {
202 8
203}
204
205fn default_iceberg_write_mode() -> IcebergWriteMode {
206 IcebergWriteMode::MergeOnRead
207}
208
209fn default_iceberg_format_version() -> FormatVersion {
210 FormatVersion::V2
211}
212
213fn default_true() -> bool {
214 true
215}
216
217fn default_some_true() -> Option<bool> {
218 Some(true)
219}
220
221fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
222 let parsed = value
223 .trim()
224 .parse::<u8>()
225 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
226 match parsed {
227 1 => Ok(FormatVersion::V1),
228 2 => Ok(FormatVersion::V2),
229 3 => Ok(FormatVersion::V3),
230 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
231 }
232}
233
234fn deserialize_format_version<'de, D>(
235 deserializer: D,
236) -> std::result::Result<FormatVersion, D::Error>
237where
238 D: Deserializer<'de>,
239{
240 struct FormatVersionVisitor;
241
242 impl<'de> Visitor<'de> for FormatVersionVisitor {
243 type Value = FormatVersion;
244
245 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 formatter.write_str("format-version as 1, 2, or 3")
247 }
248
249 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
250 where
251 E: de::Error,
252 {
253 let value = u8::try_from(value)
254 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
255 parse_format_version_str(&value.to_string()).map_err(E::custom)
256 }
257
258 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
259 where
260 E: de::Error,
261 {
262 let value = u8::try_from(value)
263 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
264 parse_format_version_str(&value.to_string()).map_err(E::custom)
265 }
266
267 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
268 where
269 E: de::Error,
270 {
271 parse_format_version_str(value).map_err(E::custom)
272 }
273
274 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
275 where
276 E: de::Error,
277 {
278 self.visit_str(&value)
279 }
280 }
281
282 deserializer.deserialize_any(FormatVersionVisitor)
283}
284
285#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
287#[serde(rename_all = "kebab-case")]
288pub enum CompactionType {
289 #[default]
291 Full,
292 SmallFiles,
294 FilesWithDelete,
296}
297
298impl CompactionType {
299 pub fn as_str(&self) -> &'static str {
300 match self {
301 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
302 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
303 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
304 }
305 }
306}
307
308impl std::str::FromStr for CompactionType {
309 type Err = SinkError;
310
311 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
312 match s {
313 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
314 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
315 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
316 _ => Err(SinkError::Config(anyhow!(format!(
317 "invalid compaction_type: {}, must be one of: {}, {}, {}",
318 s,
319 ICEBERG_COMPACTION_TYPE_FULL,
320 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
321 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
322 )))),
323 }
324 }
325}
326
327impl TryFrom<&str> for CompactionType {
328 type Error = <Self as std::str::FromStr>::Err;
329
330 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
331 value.parse()
332 }
333}
334
335impl TryFrom<String> for CompactionType {
336 type Error = <Self as std::str::FromStr>::Err;
337
338 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
339 value.as_str().parse()
340 }
341}
342
343impl std::fmt::Display for CompactionType {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 write!(f, "{}", self.as_str())
346 }
347}
348
349#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
351pub struct IcebergConfig {
352 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
355 pub force_append_only: bool,
356
357 #[serde(flatten)]
358 common: IcebergCommon,
359
360 #[serde(flatten)]
361 table: IcebergTableIdentifier,
362
363 #[serde(
364 rename = "primary_key",
365 default,
366 deserialize_with = "deserialize_optional_string_seq_from_string"
367 )]
368 pub primary_key: Option<Vec<String>>,
369
370 #[serde(skip)]
372 pub java_catalog_props: HashMap<String, String>,
373
374 #[serde(default)]
375 pub partition_by: Option<String>,
376
377 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
379 #[serde_as(as = "DisplayFromStr")]
380 #[with_option(allow_alter_on_fly)]
381 pub commit_checkpoint_interval: u64,
382
383 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
384 pub create_table_if_not_exists: bool,
385
386 #[serde(default = "default_some_true")]
388 #[serde_as(as = "Option<DisplayFromStr>")]
389 pub is_exactly_once: Option<bool>,
390 #[serde(default = "default_commit_retry_num")]
395 pub commit_retry_num: u32,
396
397 #[serde(
399 rename = "enable_compaction",
400 default,
401 deserialize_with = "deserialize_bool_from_string"
402 )]
403 #[with_option(allow_alter_on_fly)]
404 pub enable_compaction: bool,
405
406 #[serde(rename = "compaction_interval_sec", default)]
408 #[serde_as(as = "Option<DisplayFromStr>")]
409 #[with_option(allow_alter_on_fly)]
410 pub compaction_interval_sec: Option<u64>,
411
412 #[serde(
414 rename = "enable_snapshot_expiration",
415 default,
416 deserialize_with = "deserialize_bool_from_string"
417 )]
418 #[with_option(allow_alter_on_fly)]
419 pub enable_snapshot_expiration: bool,
420
421 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
423 pub write_mode: IcebergWriteMode,
424
425 #[serde(
427 rename = "format_version",
428 default = "default_iceberg_format_version",
429 deserialize_with = "deserialize_format_version"
430 )]
431 pub format_version: FormatVersion,
432
433 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
436 #[serde_as(as = "Option<DisplayFromStr>")]
437 #[with_option(allow_alter_on_fly)]
438 pub snapshot_expiration_max_age_millis: Option<i64>,
439
440 #[serde(rename = "snapshot_expiration_retain_last", default)]
442 #[serde_as(as = "Option<DisplayFromStr>")]
443 #[with_option(allow_alter_on_fly)]
444 pub snapshot_expiration_retain_last: Option<i32>,
445
446 #[serde(
447 rename = "snapshot_expiration_clear_expired_files",
448 default = "default_true",
449 deserialize_with = "deserialize_bool_from_string"
450 )]
451 #[with_option(allow_alter_on_fly)]
452 pub snapshot_expiration_clear_expired_files: bool,
453
454 #[serde(
455 rename = "snapshot_expiration_clear_expired_meta_data",
456 default = "default_true",
457 deserialize_with = "deserialize_bool_from_string"
458 )]
459 #[with_option(allow_alter_on_fly)]
460 pub snapshot_expiration_clear_expired_meta_data: bool,
461
462 #[serde(rename = "compaction.max_snapshots_num", default)]
465 #[serde_as(as = "Option<DisplayFromStr>")]
466 #[with_option(allow_alter_on_fly)]
467 pub max_snapshots_num_before_compaction: Option<usize>,
468
469 #[serde(rename = "compaction.small_files_threshold_mb", default)]
470 #[serde_as(as = "Option<DisplayFromStr>")]
471 #[with_option(allow_alter_on_fly)]
472 pub small_files_threshold_mb: Option<u64>,
473
474 #[serde(rename = "compaction.delete_files_count_threshold", default)]
475 #[serde_as(as = "Option<DisplayFromStr>")]
476 #[with_option(allow_alter_on_fly)]
477 pub delete_files_count_threshold: Option<usize>,
478
479 #[serde(rename = "compaction.trigger_snapshot_count", default)]
480 #[serde_as(as = "Option<DisplayFromStr>")]
481 #[with_option(allow_alter_on_fly)]
482 pub trigger_snapshot_count: Option<usize>,
483
484 #[serde(rename = "compaction.target_file_size_mb", default)]
485 #[serde_as(as = "Option<DisplayFromStr>")]
486 #[with_option(allow_alter_on_fly)]
487 pub target_file_size_mb: Option<u64>,
488
489 #[serde(rename = "compaction.type", default)]
492 #[with_option(allow_alter_on_fly)]
493 pub compaction_type: Option<CompactionType>,
494
495 #[serde(rename = "compaction.write_parquet_compression", default)]
499 #[with_option(allow_alter_on_fly)]
500 pub write_parquet_compression: Option<String>,
501
502 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
505 #[serde_as(as = "Option<DisplayFromStr>")]
506 #[with_option(allow_alter_on_fly)]
507 pub write_parquet_max_row_group_rows: Option<usize>,
508}
509
510impl EnforceSecret for IcebergConfig {
511 fn enforce_secret<'a>(
512 prop_iter: impl Iterator<Item = &'a str>,
513 ) -> crate::error::ConnectorResult<()> {
514 for prop in prop_iter {
515 IcebergCommon::enforce_one(prop)?;
516 }
517 Ok(())
518 }
519
520 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
521 IcebergCommon::enforce_one(prop)
522 }
523}
524
525impl IcebergConfig {
526 fn validate_append_only_write_mode(
529 sink_type: &str,
530 write_mode: IcebergWriteMode,
531 ) -> Result<()> {
532 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
533 return Err(SinkError::Config(anyhow!(
534 "'copy-on-write' mode is not supported for append-only iceberg sink. \
535 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
536 )));
537 }
538 Ok(())
539 }
540
541 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
542 let mut config =
543 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
544 .map_err(|e| SinkError::Config(anyhow!(e)))?;
545
546 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
547 return Err(SinkError::Config(anyhow!(
548 "`{}` must be {}, or {}",
549 SINK_TYPE_OPTION,
550 SINK_TYPE_APPEND_ONLY,
551 SINK_TYPE_UPSERT
552 )));
553 }
554
555 if config.r#type == SINK_TYPE_UPSERT {
556 if let Some(primary_key) = &config.primary_key {
557 if primary_key.is_empty() {
558 return Err(SinkError::Config(anyhow!(
559 "`primary-key` must not be empty in {}",
560 SINK_TYPE_UPSERT
561 )));
562 }
563 } else {
564 return Err(SinkError::Config(anyhow!(
565 "Must set `primary-key` in {}",
566 SINK_TYPE_UPSERT
567 )));
568 }
569 }
570
571 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
573
574 config.java_catalog_props = values
576 .iter()
577 .filter(|(k, _v)| {
578 k.starts_with("catalog.")
579 && k != &"catalog.uri"
580 && k != &"catalog.type"
581 && k != &"catalog.name"
582 && k != &"catalog.header"
583 })
584 .map(|(k, v)| (k[8..].to_string(), v.clone()))
585 .collect();
586
587 if config.commit_checkpoint_interval == 0 {
588 return Err(SinkError::Config(anyhow!(
589 "`commit-checkpoint-interval` must be greater than 0"
590 )));
591 }
592
593 Ok(config)
594 }
595
596 pub fn catalog_type(&self) -> &str {
597 self.common.catalog_type()
598 }
599
600 pub async fn load_table(&self) -> Result<Table> {
601 self.common
602 .load_table(&self.table, &self.java_catalog_props)
603 .await
604 .map_err(Into::into)
605 }
606
607 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
608 self.common
609 .create_catalog(&self.java_catalog_props)
610 .await
611 .map_err(Into::into)
612 }
613
614 pub fn full_table_name(&self) -> Result<TableIdent> {
615 self.table.to_table_ident().map_err(Into::into)
616 }
617
618 pub fn catalog_name(&self) -> String {
619 self.common.catalog_name()
620 }
621
622 pub fn table_format_version(&self) -> FormatVersion {
623 self.format_version
624 }
625
626 pub fn compaction_interval_sec(&self) -> u64 {
627 self.compaction_interval_sec.unwrap_or(3600)
629 }
630
631 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
634 self.snapshot_expiration_max_age_millis
635 .map(|max_age_millis| current_time_ms - max_age_millis)
636 }
637
638 pub fn trigger_snapshot_count(&self) -> usize {
639 self.trigger_snapshot_count.unwrap_or(usize::MAX)
640 }
641
642 pub fn small_files_threshold_mb(&self) -> u64 {
643 self.small_files_threshold_mb.unwrap_or(64)
644 }
645
646 pub fn delete_files_count_threshold(&self) -> usize {
647 self.delete_files_count_threshold.unwrap_or(256)
648 }
649
650 pub fn target_file_size_mb(&self) -> u64 {
651 self.target_file_size_mb.unwrap_or(1024)
652 }
653
654 pub fn compaction_type(&self) -> CompactionType {
657 self.compaction_type.unwrap_or_default()
658 }
659
660 pub fn write_parquet_compression(&self) -> &str {
663 self.write_parquet_compression.as_deref().unwrap_or("zstd")
664 }
665
666 pub fn write_parquet_max_row_group_rows(&self) -> usize {
669 self.write_parquet_max_row_group_rows.unwrap_or(122880)
670 }
671
672 pub fn get_parquet_compression(&self) -> Compression {
675 parse_parquet_compression(self.write_parquet_compression())
676 }
677}
678
679fn parse_parquet_compression(codec: &str) -> Compression {
681 match codec.to_lowercase().as_str() {
682 "uncompressed" => Compression::UNCOMPRESSED,
683 "snappy" => Compression::SNAPPY,
684 "gzip" => Compression::GZIP(Default::default()),
685 "lzo" => Compression::LZO,
686 "brotli" => Compression::BROTLI(Default::default()),
687 "lz4" => Compression::LZ4,
688 "zstd" => Compression::ZSTD(Default::default()),
689 _ => {
690 tracing::warn!(
691 "Unknown compression codec '{}', falling back to SNAPPY",
692 codec
693 );
694 Compression::SNAPPY
695 }
696 }
697}
698
699pub struct IcebergSink {
700 pub config: IcebergConfig,
701 param: SinkParam,
702 unique_column_ids: Option<Vec<usize>>,
704}
705
706impl EnforceSecret for IcebergSink {
707 fn enforce_secret<'a>(
708 prop_iter: impl Iterator<Item = &'a str>,
709 ) -> crate::error::ConnectorResult<()> {
710 for prop in prop_iter {
711 IcebergConfig::enforce_one(prop)?;
712 }
713 Ok(())
714 }
715}
716
717impl TryFrom<SinkParam> for IcebergSink {
718 type Error = SinkError;
719
720 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
721 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
722 IcebergSink::new(config, param)
723 }
724}
725
726impl Debug for IcebergSink {
727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 f.debug_struct("IcebergSink")
729 .field("config", &self.config)
730 .finish()
731 }
732}
733
734async fn create_and_validate_table_impl(
735 config: &IcebergConfig,
736 param: &SinkParam,
737) -> Result<Table> {
738 if config.create_table_if_not_exists {
739 create_table_if_not_exists_impl(config, param).await?;
740 }
741
742 let table = config
743 .load_table()
744 .await
745 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
746
747 let sink_schema = param.schema();
748 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
749 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
750
751 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
752 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
753
754 Ok(table)
755}
756
757async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
758 let catalog = config.create_catalog().await?;
759 let namespace = if let Some(database_name) = config.table.database_name() {
760 let namespace = NamespaceIdent::new(database_name.to_owned());
761 if !catalog
762 .namespace_exists(&namespace)
763 .await
764 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
765 {
766 catalog
767 .create_namespace(&namespace, HashMap::default())
768 .await
769 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
770 .context("failed to create iceberg namespace")?;
771 }
772 namespace
773 } else {
774 bail!("database name must be set if you want to create table")
775 };
776
777 let table_id = config
778 .full_table_name()
779 .context("Unable to parse table name")?;
780 if !catalog
781 .table_exists(&table_id)
782 .await
783 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
784 {
785 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
786 let arrow_fields = param
788 .columns
789 .iter()
790 .map(|column| {
791 Ok(iceberg_create_table_arrow_convert
792 .to_arrow_field(&column.name, &column.data_type)
793 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
794 .context(format!(
795 "failed to convert {}: {} to arrow type",
796 &column.name, &column.data_type
797 ))?)
798 })
799 .collect::<Result<Vec<ArrowField>>>()?;
800 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
801 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
802 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
803 .context("failed to convert arrow schema to iceberg schema")?;
804
805 let location = {
806 let mut names = namespace.clone().inner();
807 names.push(config.table.table_name().to_owned());
808 match &config.common.warehouse_path {
809 Some(warehouse_path) => {
810 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
811 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
813 let url = Url::parse(warehouse_path);
814 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
815 if config.common.catalog_type() == "rest"
818 || config.common.catalog_type() == "rest_rust"
819 {
820 None
821 } else {
822 bail!(format!("Invalid warehouse path: {}", warehouse_path))
823 }
824 } else if warehouse_path.ends_with('/') {
825 Some(format!("{}{}", warehouse_path, names.join("/")))
826 } else {
827 Some(format!("{}/{}", warehouse_path, names.join("/")))
828 }
829 }
830 None => None,
831 }
832 };
833
834 let partition_spec = match &config.partition_by {
835 Some(partition_by) => {
836 let mut partition_fields = Vec::<UnboundPartitionField>::new();
837 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
838 .into_iter()
839 .enumerate()
840 {
841 match iceberg_schema.field_id_by_name(&column) {
842 Some(id) => partition_fields.push(
843 UnboundPartitionField::builder()
844 .source_id(id)
845 .transform(transform)
846 .name(format!("_p_{}", column))
847 .field_id(PARTITION_DATA_ID_START + i as i32)
848 .build(),
849 ),
850 None => bail!(format!(
851 "Partition source column does not exist in schema: {}",
852 column
853 )),
854 };
855 }
856 Some(
857 UnboundPartitionSpec::builder()
858 .with_spec_id(0)
859 .add_partition_fields(partition_fields)
860 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
861 .context("failed to add partition columns")?
862 .build(),
863 )
864 }
865 None => None,
866 };
867
868 let properties = HashMap::from([(
870 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
871 (config.format_version as u8).to_string(),
872 )]);
873
874 let table_creation_builder = TableCreation::builder()
875 .name(config.table.table_name().to_owned())
876 .schema(iceberg_schema)
877 .format_version(config.table_format_version())
878 .properties(properties);
879
880 let table_creation = match (location, partition_spec) {
881 (Some(location), Some(partition_spec)) => table_creation_builder
882 .location(location)
883 .partition_spec(partition_spec)
884 .build(),
885 (Some(location), None) => table_creation_builder.location(location).build(),
886 (None, Some(partition_spec)) => table_creation_builder
887 .partition_spec(partition_spec)
888 .build(),
889 (None, None) => table_creation_builder.build(),
890 };
891
892 catalog
893 .create_table(&namespace, table_creation)
894 .await
895 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
896 .context("failed to create iceberg table")?;
897 }
898 Ok(())
899}
900
901impl IcebergSink {
902 pub async fn create_and_validate_table(&self) -> Result<Table> {
903 create_and_validate_table_impl(&self.config, &self.param).await
904 }
905
906 pub async fn create_table_if_not_exists(&self) -> Result<()> {
907 create_table_if_not_exists_impl(&self.config, &self.param).await
908 }
909
910 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
911 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
912 if let Some(pk) = &config.primary_key {
913 let mut unique_column_ids = Vec::with_capacity(pk.len());
914 for col_name in pk {
915 let id = param
916 .columns
917 .iter()
918 .find(|col| col.name.as_str() == col_name)
919 .ok_or_else(|| {
920 SinkError::Config(anyhow!(
921 "Primary key column {} not found in sink schema",
922 col_name
923 ))
924 })?
925 .column_id
926 .get_id() as usize;
927 unique_column_ids.push(id);
928 }
929 Some(unique_column_ids)
930 } else {
931 unreachable!()
932 }
933 } else {
934 None
935 };
936 Ok(Self {
937 config,
938 param,
939 unique_column_ids,
940 })
941 }
942}
943
944impl Sink for IcebergSink {
945 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
946
947 const SINK_NAME: &'static str = ICEBERG_SINK;
948
949 async fn validate(&self) -> Result<()> {
950 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
951 bail!("Snowflake catalog only supports iceberg sources");
952 }
953
954 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
955 risingwave_common::license::Feature::IcebergSinkWithGlue
956 .check_available()
957 .map_err(|e| anyhow::anyhow!(e))?;
958 }
959
960 IcebergConfig::validate_append_only_write_mode(
962 &self.config.r#type,
963 self.config.write_mode,
964 )?;
965
966 let compaction_type = self.config.compaction_type();
968
969 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
972 && compaction_type != CompactionType::Full
973 {
974 bail!(
975 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
976 compaction_type
977 );
978 }
979
980 match compaction_type {
981 CompactionType::SmallFiles => {
982 risingwave_common::license::Feature::IcebergCompaction
984 .check_available()
985 .map_err(|e| anyhow::anyhow!(e))?;
986
987 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
989 bail!(
990 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
991 self.config.write_mode
992 );
993 }
994
995 if self.config.delete_files_count_threshold.is_some() {
997 bail!(
998 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
999 );
1000 }
1001 }
1002 CompactionType::FilesWithDelete => {
1003 risingwave_common::license::Feature::IcebergCompaction
1005 .check_available()
1006 .map_err(|e| anyhow::anyhow!(e))?;
1007
1008 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1010 bail!(
1011 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1012 self.config.write_mode
1013 );
1014 }
1015
1016 if self.config.small_files_threshold_mb.is_some() {
1018 bail!(
1019 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1020 );
1021 }
1022 }
1023 CompactionType::Full => {
1024 }
1026 }
1027
1028 let _ = self.create_and_validate_table().await?;
1029 Ok(())
1030 }
1031
1032 fn support_schema_change() -> bool {
1033 true
1034 }
1035
1036 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1037 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1038
1039 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1041 if iceberg_config.enable_compaction && compaction_interval == 0 {
1042 bail!(
1043 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1044 );
1045 }
1046
1047 tracing::info!(
1049 "Alter config compaction_interval set to {} seconds",
1050 compaction_interval
1051 );
1052 }
1053
1054 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1056 && max_snapshots < 1
1057 {
1058 bail!(
1059 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1060 max_snapshots
1061 );
1062 }
1063
1064 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1066 && target_file_size_mb == 0
1067 {
1068 bail!("`compaction.target_file_size_mb` must be greater than 0");
1069 }
1070
1071 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1073 && max_row_group_rows == 0
1074 {
1075 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1076 }
1077
1078 if let Some(ref compression) = iceberg_config.write_parquet_compression {
1080 let valid_codecs = [
1081 "uncompressed",
1082 "snappy",
1083 "gzip",
1084 "lzo",
1085 "brotli",
1086 "lz4",
1087 "zstd",
1088 ];
1089 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1090 bail!(
1091 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1092 valid_codecs,
1093 compression
1094 );
1095 }
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1102 let writer = IcebergSinkWriter::new(
1103 self.config.clone(),
1104 self.param.clone(),
1105 writer_param.clone(),
1106 self.unique_column_ids.clone(),
1107 );
1108
1109 let commit_checkpoint_interval =
1110 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1111 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1112 );
1113 let log_sinker = CoordinatedLogSinker::new(
1114 &writer_param,
1115 self.param.clone(),
1116 writer,
1117 commit_checkpoint_interval,
1118 )
1119 .await?;
1120
1121 Ok(log_sinker)
1122 }
1123
1124 fn is_coordinated_sink(&self) -> bool {
1125 true
1126 }
1127
1128 async fn new_coordinator(
1129 &self,
1130 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1131 ) -> Result<SinkCommitCoordinator> {
1132 let catalog = self.config.create_catalog().await?;
1133 let table = self.create_and_validate_table().await?;
1134 let coordinator = IcebergSinkCommitter {
1135 catalog,
1136 table,
1137 last_commit_epoch: 0,
1138 sink_id: self.param.sink_id,
1139 config: self.config.clone(),
1140 param: self.param.clone(),
1141 commit_retry_num: self.config.commit_retry_num,
1142 iceberg_compact_stat_sender,
1143 };
1144 if self.config.is_exactly_once.unwrap_or_default() {
1145 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1146 } else {
1147 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1148 }
1149 }
1150}
1151
1152enum ProjectIdxVec {
1159 None,
1160 Prepare(usize),
1161 Done(Vec<usize>),
1162}
1163
1164type DataFileWriterBuilderType =
1165 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1166type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1167 ParquetWriterBuilder,
1168 DefaultLocationGenerator,
1169 DefaultFileNameGenerator,
1170>;
1171type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1172 ParquetWriterBuilder,
1173 DefaultLocationGenerator,
1174 DefaultFileNameGenerator,
1175>;
1176type DeletionVectorWriterBuilderType =
1177 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1178type DeletionVectorWriterType =
1179 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1180type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1181 ParquetWriterBuilder,
1182 DefaultLocationGenerator,
1183 DefaultFileNameGenerator,
1184>;
1185
1186#[derive(Clone)]
1187enum PositionDeleteWriterBuilderType {
1188 PositionDelete(PositionDeleteFileWriterBuilderType),
1189 DeletionVector(DeletionVectorWriterBuilderType),
1190}
1191
1192enum PositionDeleteWriterType {
1193 PositionDelete(PositionDeleteFileWriterType),
1194 DeletionVector(DeletionVectorWriterType),
1195}
1196
1197#[async_trait]
1198impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1199 type R = PositionDeleteWriterType;
1200
1201 async fn build(
1202 &self,
1203 partition_key: Option<iceberg::spec::PartitionKey>,
1204 ) -> iceberg::Result<Self::R> {
1205 match self {
1206 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1207 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1208 ),
1209 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1210 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1211 ),
1212 }
1213 }
1214}
1215
1216#[async_trait]
1217impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1218 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1219 match self {
1220 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1221 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1222 }
1223 }
1224
1225 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1226 match self {
1227 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1228 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1229 }
1230 }
1231}
1232
1233#[derive(Clone)]
1234struct SharedIcebergWriterBuilder<B>(Arc<B>);
1235
1236#[async_trait]
1237impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1238 type R = B::R;
1239
1240 async fn build(
1241 &self,
1242 partition_key: Option<iceberg::spec::PartitionKey>,
1243 ) -> iceberg::Result<Self::R> {
1244 self.0.build(partition_key).await
1245 }
1246}
1247
1248#[derive(Clone)]
1249struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1250 inner: Arc<B>,
1251 fanout_enabled: bool,
1252 schema: IcebergSchemaRef,
1253 partition_spec: PartitionSpecRef,
1254 compute_partition: bool,
1255}
1256
1257impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1258 fn new(
1259 inner: B,
1260 fanout_enabled: bool,
1261 schema: IcebergSchemaRef,
1262 partition_spec: PartitionSpecRef,
1263 compute_partition: bool,
1264 ) -> Self {
1265 Self {
1266 inner: Arc::new(inner),
1267 fanout_enabled,
1268 schema,
1269 partition_spec,
1270 compute_partition,
1271 }
1272 }
1273
1274 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1275 let partition_splitter = match (
1276 self.partition_spec.is_unpartitioned(),
1277 self.compute_partition,
1278 ) {
1279 (true, _) => None,
1280 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1281 self.schema.clone(),
1282 self.partition_spec.clone(),
1283 )?),
1284 (false, false) => Some(
1285 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1286 self.schema.clone(),
1287 self.partition_spec.clone(),
1288 )?,
1289 ),
1290 };
1291
1292 Ok(TaskWriter::new_with_partition_splitter(
1293 SharedIcebergWriterBuilder(self.inner.clone()),
1294 self.fanout_enabled,
1295 self.schema.clone(),
1296 self.partition_spec.clone(),
1297 partition_splitter,
1298 ))
1299 }
1300}
1301
1302pub enum IcebergSinkWriter {
1303 Created(IcebergSinkWriterArgs),
1304 Initialized(IcebergSinkWriterInner),
1305}
1306
1307pub struct IcebergSinkWriterArgs {
1308 config: IcebergConfig,
1309 sink_param: SinkParam,
1310 writer_param: SinkWriterParam,
1311 unique_column_ids: Option<Vec<usize>>,
1312}
1313
1314pub struct IcebergSinkWriterInner {
1315 writer: IcebergWriterDispatch,
1316 arrow_schema: SchemaRef,
1317 metrics: IcebergWriterMetrics,
1319 table: Table,
1321 project_idx_vec: ProjectIdxVec,
1324}
1325
1326#[allow(clippy::type_complexity)]
1327enum IcebergWriterDispatch {
1328 Append {
1329 writer: Option<Box<dyn IcebergWriter>>,
1330 writer_builder:
1331 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1332 },
1333 Upsert {
1334 writer: Option<Box<dyn IcebergWriter>>,
1335 writer_builder: TaskWriterBuilderWrapper<
1336 MonitoredGeneralWriterBuilder<
1337 DeltaWriterBuilder<
1338 DataFileWriterBuilderType,
1339 PositionDeleteWriterBuilderType,
1340 EqualityDeleteFileWriterBuilderType,
1341 >,
1342 >,
1343 >,
1344 arrow_schema_with_op_column: SchemaRef,
1345 },
1346}
1347
1348impl IcebergWriterDispatch {
1349 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1350 match self {
1351 IcebergWriterDispatch::Append { writer, .. }
1352 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1353 }
1354 }
1355}
1356
1357pub struct IcebergWriterMetrics {
1358 _write_qps: LabelGuardedIntCounter,
1363 _write_latency: LabelGuardedHistogram,
1364 write_bytes: LabelGuardedIntCounter,
1365}
1366
1367impl IcebergSinkWriter {
1368 pub fn new(
1369 config: IcebergConfig,
1370 sink_param: SinkParam,
1371 writer_param: SinkWriterParam,
1372 unique_column_ids: Option<Vec<usize>>,
1373 ) -> Self {
1374 Self::Created(IcebergSinkWriterArgs {
1375 config,
1376 sink_param,
1377 writer_param,
1378 unique_column_ids,
1379 })
1380 }
1381}
1382
1383impl IcebergSinkWriterInner {
1384 fn build_append_only(
1385 config: &IcebergConfig,
1386 table: Table,
1387 writer_param: &SinkWriterParam,
1388 ) -> Result<Self> {
1389 let SinkWriterParam {
1390 extra_partition_col_idx,
1391 actor_id,
1392 sink_id,
1393 sink_name,
1394 ..
1395 } = writer_param;
1396 let metrics_labels = [
1397 &actor_id.to_string(),
1398 &sink_id.to_string(),
1399 sink_name.as_str(),
1400 ];
1401
1402 let write_qps = GLOBAL_SINK_METRICS
1404 .iceberg_write_qps
1405 .with_guarded_label_values(&metrics_labels);
1406 let write_latency = GLOBAL_SINK_METRICS
1407 .iceberg_write_latency
1408 .with_guarded_label_values(&metrics_labels);
1409 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1412 .iceberg_rolling_unflushed_data_file
1413 .with_guarded_label_values(&metrics_labels);
1414 let write_bytes = GLOBAL_SINK_METRICS
1415 .iceberg_write_bytes
1416 .with_guarded_label_values(&metrics_labels);
1417
1418 let schema = table.metadata().current_schema();
1419 let partition_spec = table.metadata().default_partition_spec();
1420 let fanout_enabled = !partition_spec.fields().is_empty();
1421 let unique_uuid_suffix = Uuid::now_v7();
1423
1424 let parquet_writer_properties = WriterProperties::builder()
1425 .set_compression(config.get_parquet_compression())
1426 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1427 .set_created_by(PARQUET_CREATED_BY.to_owned())
1428 .build();
1429
1430 let parquet_writer_builder =
1431 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1432 let rolling_builder = RollingFileWriterBuilder::new(
1433 parquet_writer_builder,
1434 (config.target_file_size_mb() * 1024 * 1024) as usize,
1435 table.file_io().clone(),
1436 DefaultLocationGenerator::new(table.metadata().clone())
1437 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1438 DefaultFileNameGenerator::new(
1439 writer_param.actor_id.to_string(),
1440 Some(unique_uuid_suffix.to_string()),
1441 iceberg::spec::DataFileFormat::Parquet,
1442 ),
1443 );
1444 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1445 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1446 data_file_builder,
1447 write_qps.clone(),
1448 write_latency.clone(),
1449 );
1450 let writer_builder = TaskWriterBuilderWrapper::new(
1451 monitored_builder,
1452 fanout_enabled,
1453 schema.clone(),
1454 partition_spec.clone(),
1455 true,
1456 );
1457 let inner_writer = Some(Box::new(
1458 writer_builder
1459 .build()
1460 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1461 ) as Box<dyn IcebergWriter>);
1462 Ok(Self {
1463 arrow_schema: Arc::new(
1464 schema_to_arrow_schema(table.metadata().current_schema())
1465 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1466 ),
1467 metrics: IcebergWriterMetrics {
1468 _write_qps: write_qps,
1469 _write_latency: write_latency,
1470 write_bytes,
1471 },
1472 writer: IcebergWriterDispatch::Append {
1473 writer: inner_writer,
1474 writer_builder,
1475 },
1476 table,
1477 project_idx_vec: {
1478 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1479 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1480 } else {
1481 ProjectIdxVec::None
1482 }
1483 },
1484 })
1485 }
1486
1487 fn build_upsert(
1488 config: &IcebergConfig,
1489 table: Table,
1490 unique_column_ids: Vec<usize>,
1491 writer_param: &SinkWriterParam,
1492 ) -> Result<Self> {
1493 let SinkWriterParam {
1494 extra_partition_col_idx,
1495 actor_id,
1496 sink_id,
1497 sink_name,
1498 ..
1499 } = writer_param;
1500 let metrics_labels = [
1501 &actor_id.to_string(),
1502 &sink_id.to_string(),
1503 sink_name.as_str(),
1504 ];
1505 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1506
1507 let write_qps = GLOBAL_SINK_METRICS
1509 .iceberg_write_qps
1510 .with_guarded_label_values(&metrics_labels);
1511 let write_latency = GLOBAL_SINK_METRICS
1512 .iceberg_write_latency
1513 .with_guarded_label_values(&metrics_labels);
1514 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1517 .iceberg_rolling_unflushed_data_file
1518 .with_guarded_label_values(&metrics_labels);
1519 let write_bytes = GLOBAL_SINK_METRICS
1520 .iceberg_write_bytes
1521 .with_guarded_label_values(&metrics_labels);
1522
1523 let schema = table.metadata().current_schema();
1525 let partition_spec = table.metadata().default_partition_spec();
1526 let fanout_enabled = !partition_spec.fields().is_empty();
1527 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1528
1529 let unique_uuid_suffix = Uuid::now_v7();
1531
1532 let parquet_writer_properties = WriterProperties::builder()
1533 .set_compression(config.get_parquet_compression())
1534 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1535 .set_created_by(PARQUET_CREATED_BY.to_owned())
1536 .build();
1537
1538 let data_file_builder = {
1539 let parquet_writer_builder =
1540 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1541 let rolling_writer_builder = RollingFileWriterBuilder::new(
1542 parquet_writer_builder,
1543 (config.target_file_size_mb() * 1024 * 1024) as usize,
1544 table.file_io().clone(),
1545 DefaultLocationGenerator::new(table.metadata().clone())
1546 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1547 DefaultFileNameGenerator::new(
1548 writer_param.actor_id.to_string(),
1549 Some(unique_uuid_suffix.to_string()),
1550 iceberg::spec::DataFileFormat::Parquet,
1551 ),
1552 );
1553 DataFileWriterBuilder::new(rolling_writer_builder)
1554 };
1555 let position_delete_builder = if use_deletion_vectors {
1556 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1557 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1558 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1559 table.file_io().clone(),
1560 location_generator,
1561 DefaultFileNameGenerator::new(
1562 writer_param.actor_id.to_string(),
1563 Some(format!("delvec-{}", unique_uuid_suffix)),
1564 iceberg::spec::DataFileFormat::Puffin,
1565 ),
1566 ))
1567 } else {
1568 let parquet_writer_builder = ParquetWriterBuilder::new(
1569 parquet_writer_properties.clone(),
1570 POSITION_DELETE_SCHEMA.clone().into(),
1571 );
1572 let rolling_writer_builder = RollingFileWriterBuilder::new(
1573 parquet_writer_builder,
1574 (config.target_file_size_mb() * 1024 * 1024) as usize,
1575 table.file_io().clone(),
1576 DefaultLocationGenerator::new(table.metadata().clone())
1577 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1578 DefaultFileNameGenerator::new(
1579 writer_param.actor_id.to_string(),
1580 Some(format!("pos-del-{}", unique_uuid_suffix)),
1581 iceberg::spec::DataFileFormat::Parquet,
1582 ),
1583 );
1584 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1585 rolling_writer_builder,
1586 ))
1587 };
1588 let equality_delete_builder = {
1589 let eq_del_config = EqualityDeleteWriterConfig::new(
1590 unique_column_ids.clone(),
1591 table.metadata().current_schema().clone(),
1592 )
1593 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1594 let parquet_writer_builder = ParquetWriterBuilder::new(
1595 parquet_writer_properties,
1596 Arc::new(
1597 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1598 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1599 ),
1600 );
1601 let rolling_writer_builder = RollingFileWriterBuilder::new(
1602 parquet_writer_builder,
1603 (config.target_file_size_mb() * 1024 * 1024) as usize,
1604 table.file_io().clone(),
1605 DefaultLocationGenerator::new(table.metadata().clone())
1606 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1607 DefaultFileNameGenerator::new(
1608 writer_param.actor_id.to_string(),
1609 Some(format!("eq-del-{}", unique_uuid_suffix)),
1610 iceberg::spec::DataFileFormat::Parquet,
1611 ),
1612 );
1613
1614 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1615 };
1616 let delta_builder = DeltaWriterBuilder::new(
1617 data_file_builder,
1618 position_delete_builder,
1619 equality_delete_builder,
1620 unique_column_ids,
1621 schema.clone(),
1622 );
1623 let original_arrow_schema = Arc::new(
1624 schema_to_arrow_schema(table.metadata().current_schema())
1625 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1626 );
1627 let schema_with_extra_op_column = {
1628 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1629 new_fields.push(Arc::new(ArrowField::new(
1630 "op".to_owned(),
1631 ArrowDataType::Int32,
1632 false,
1633 )));
1634 Arc::new(ArrowSchema::new(new_fields))
1635 };
1636 let writer_builder = TaskWriterBuilderWrapper::new(
1637 MonitoredGeneralWriterBuilder::new(
1638 delta_builder,
1639 write_qps.clone(),
1640 write_latency.clone(),
1641 ),
1642 fanout_enabled,
1643 schema.clone(),
1644 partition_spec.clone(),
1645 true,
1646 );
1647 let inner_writer = Some(Box::new(
1648 writer_builder
1649 .build()
1650 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1651 ) as Box<dyn IcebergWriter>);
1652 Ok(Self {
1653 arrow_schema: original_arrow_schema,
1654 metrics: IcebergWriterMetrics {
1655 _write_qps: write_qps,
1656 _write_latency: write_latency,
1657 write_bytes,
1658 },
1659 table,
1660 writer: IcebergWriterDispatch::Upsert {
1661 writer: inner_writer,
1662 writer_builder,
1663 arrow_schema_with_op_column: schema_with_extra_op_column,
1664 },
1665 project_idx_vec: {
1666 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1667 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1668 } else {
1669 ProjectIdxVec::None
1670 }
1671 },
1672 })
1673 }
1674}
1675
1676#[async_trait]
1677impl SinkWriter for IcebergSinkWriter {
1678 type CommitMetadata = Option<SinkMetadata>;
1679
1680 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1682 let Self::Created(args) = self else {
1683 return Ok(());
1684 };
1685
1686 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1687 let inner = match &args.unique_column_ids {
1688 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1689 &args.config,
1690 table,
1691 unique_column_ids.clone(),
1692 &args.writer_param,
1693 )?,
1694 None => {
1695 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1696 }
1697 };
1698
1699 *self = IcebergSinkWriter::Initialized(inner);
1700 Ok(())
1701 }
1702
1703 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1705 let Self::Initialized(inner) = self else {
1706 unreachable!("IcebergSinkWriter should be initialized before barrier");
1707 };
1708
1709 match &mut inner.writer {
1711 IcebergWriterDispatch::Append {
1712 writer,
1713 writer_builder,
1714 } => {
1715 if writer.is_none() {
1716 *writer = Some(Box::new(
1717 writer_builder
1718 .build()
1719 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1720 ));
1721 }
1722 }
1723 IcebergWriterDispatch::Upsert {
1724 writer,
1725 writer_builder,
1726 ..
1727 } => {
1728 if writer.is_none() {
1729 *writer = Some(Box::new(
1730 writer_builder
1731 .build()
1732 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1733 ));
1734 }
1735 }
1736 };
1737
1738 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1740 match &mut inner.project_idx_vec {
1741 ProjectIdxVec::None => {}
1742 ProjectIdxVec::Prepare(idx) => {
1743 if *idx >= chunk.columns().len() {
1744 return Err(SinkError::Iceberg(anyhow!(
1745 "invalid extra partition column index {}",
1746 idx
1747 )));
1748 }
1749 let project_idx_vec = (0..*idx)
1750 .chain(*idx + 1..chunk.columns().len())
1751 .collect_vec();
1752 chunk = chunk.project(&project_idx_vec);
1753 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1754 }
1755 ProjectIdxVec::Done(idx_vec) => {
1756 chunk = chunk.project(idx_vec);
1757 }
1758 }
1759 if ops.is_empty() {
1760 return Ok(());
1761 }
1762 let write_batch_size = chunk.estimated_heap_size();
1763 let batch = match &inner.writer {
1764 IcebergWriterDispatch::Append { .. } => {
1765 let filters =
1767 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1768 chunk.set_visibility(filters);
1769 IcebergArrowConvert
1770 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1771 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1772 }
1773 IcebergWriterDispatch::Upsert {
1774 arrow_schema_with_op_column,
1775 ..
1776 } => {
1777 let chunk = IcebergArrowConvert
1778 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1779 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1780 let ops = Arc::new(Int32Array::from(
1781 ops.iter()
1782 .map(|op| match op {
1783 Op::UpdateInsert | Op::Insert => INSERT_OP,
1784 Op::UpdateDelete | Op::Delete => DELETE_OP,
1785 })
1786 .collect_vec(),
1787 ));
1788 let mut columns = chunk.columns().to_vec();
1789 columns.push(ops);
1790 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1791 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1792 }
1793 };
1794
1795 let writer = inner.writer.get_writer().unwrap();
1796 writer
1797 .write(batch)
1798 .instrument_await("iceberg_write")
1799 .await
1800 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1801 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1802 Ok(())
1803 }
1804
1805 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1808 let Self::Initialized(inner) = self else {
1809 unreachable!("IcebergSinkWriter should be initialized before barrier");
1810 };
1811
1812 if !is_checkpoint {
1814 return Ok(None);
1815 }
1816
1817 let close_result = match &mut inner.writer {
1818 IcebergWriterDispatch::Append {
1819 writer,
1820 writer_builder,
1821 } => {
1822 let close_result = match writer.take() {
1823 Some(mut writer) => {
1824 Some(writer.close().instrument_await("iceberg_close").await)
1825 }
1826 _ => None,
1827 };
1828 match writer_builder.build() {
1829 Ok(new_writer) => {
1830 *writer = Some(Box::new(new_writer));
1831 }
1832 _ => {
1833 warn!("Failed to build new writer after close");
1836 }
1837 }
1838 close_result
1839 }
1840 IcebergWriterDispatch::Upsert {
1841 writer,
1842 writer_builder,
1843 ..
1844 } => {
1845 let close_result = match writer.take() {
1846 Some(mut writer) => {
1847 Some(writer.close().instrument_await("iceberg_close").await)
1848 }
1849 _ => None,
1850 };
1851 match writer_builder.build() {
1852 Ok(new_writer) => {
1853 *writer = Some(Box::new(new_writer));
1854 }
1855 _ => {
1856 warn!("Failed to build new writer after close");
1859 }
1860 }
1861 close_result
1862 }
1863 };
1864
1865 match close_result {
1866 Some(Ok(result)) => {
1867 let format_version = inner.table.metadata().format_version();
1868 let partition_type = inner.table.metadata().default_partition_type();
1869 let data_files = result
1870 .into_iter()
1871 .map(|f| {
1872 let truncated = truncate_datafile(f);
1874 SerializedDataFile::try_from(truncated, partition_type, format_version)
1875 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1876 })
1877 .collect::<Result<Vec<_>>>()?;
1878 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1879 data_files,
1880 schema_id: inner.table.metadata().current_schema_id(),
1881 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1882 })?))
1883 }
1884 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1885 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1886 }
1887 }
1888}
1889
1890const SCHEMA_ID: &str = "schema_id";
1891const PARTITION_SPEC_ID: &str = "partition_spec_id";
1892const DATA_FILES: &str = "data_files";
1893
1894const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1918 data_file.lower_bounds.retain(|field_id, datum| {
1920 let size = match datum.to_bytes() {
1922 Ok(bytes) => bytes.len(),
1923 Err(_) => 0,
1924 };
1925
1926 if size > MAX_COLUMN_STAT_SIZE {
1927 tracing::debug!(
1928 field_id = field_id,
1929 size = size,
1930 "Truncating large lower_bound statistic"
1931 );
1932 return false;
1933 }
1934 true
1935 });
1936
1937 data_file.upper_bounds.retain(|field_id, datum| {
1939 let size = match datum.to_bytes() {
1941 Ok(bytes) => bytes.len(),
1942 Err(_) => 0,
1943 };
1944
1945 if size > MAX_COLUMN_STAT_SIZE {
1946 tracing::debug!(
1947 field_id = field_id,
1948 size = size,
1949 "Truncating large upper_bound statistic"
1950 );
1951 return false;
1952 }
1953 true
1954 });
1955
1956 data_file
1957}
1958
1959#[derive(Default, Clone)]
1960struct IcebergCommitResult {
1961 schema_id: i32,
1962 partition_spec_id: i32,
1963 data_files: Vec<SerializedDataFile>,
1964}
1965
1966impl IcebergCommitResult {
1967 fn try_from(value: &SinkMetadata) -> Result<Self> {
1968 if let Some(Serialized(v)) = &value.metadata {
1969 let mut values = if let serde_json::Value::Object(v) =
1970 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1971 .context("Can't parse iceberg sink metadata")?
1972 {
1973 v
1974 } else {
1975 bail!("iceberg sink metadata should be an object");
1976 };
1977
1978 let schema_id;
1979 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1980 schema_id = value
1981 .as_u64()
1982 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1983 } else {
1984 bail!("iceberg sink metadata should have schema_id");
1985 }
1986
1987 let partition_spec_id;
1988 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1989 partition_spec_id = value
1990 .as_u64()
1991 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1992 } else {
1993 bail!("iceberg sink metadata should have partition_spec_id");
1994 }
1995
1996 let data_files: Vec<SerializedDataFile>;
1997 if let serde_json::Value::Array(values) = values
1998 .remove(DATA_FILES)
1999 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2000 {
2001 data_files = values
2002 .into_iter()
2003 .map(from_value::<SerializedDataFile>)
2004 .collect::<std::result::Result<_, _>>()
2005 .unwrap();
2006 } else {
2007 bail!("iceberg sink metadata should have data_files object");
2008 }
2009
2010 Ok(Self {
2011 schema_id: schema_id as i32,
2012 partition_spec_id: partition_spec_id as i32,
2013 data_files,
2014 })
2015 } else {
2016 bail!("Can't create iceberg sink write result from empty data!")
2017 }
2018 }
2019
2020 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2021 let mut values = if let serde_json::Value::Object(value) =
2022 serde_json::from_slice::<serde_json::Value>(&value)
2023 .context("Can't parse iceberg sink metadata")?
2024 {
2025 value
2026 } else {
2027 bail!("iceberg sink metadata should be an object");
2028 };
2029
2030 let schema_id;
2031 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2032 schema_id = value
2033 .as_u64()
2034 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2035 } else {
2036 bail!("iceberg sink metadata should have schema_id");
2037 }
2038
2039 let partition_spec_id;
2040 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2041 partition_spec_id = value
2042 .as_u64()
2043 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2044 } else {
2045 bail!("iceberg sink metadata should have partition_spec_id");
2046 }
2047
2048 let data_files: Vec<SerializedDataFile>;
2049 if let serde_json::Value::Array(values) = values
2050 .remove(DATA_FILES)
2051 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2052 {
2053 data_files = values
2054 .into_iter()
2055 .map(from_value::<SerializedDataFile>)
2056 .collect::<std::result::Result<_, _>>()
2057 .unwrap();
2058 } else {
2059 bail!("iceberg sink metadata should have data_files object");
2060 }
2061
2062 Ok(Self {
2063 schema_id: schema_id as i32,
2064 partition_spec_id: partition_spec_id as i32,
2065 data_files,
2066 })
2067 }
2068}
2069
2070impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2071 type Error = SinkError;
2072
2073 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2074 let json_data_files = serde_json::Value::Array(
2075 value
2076 .data_files
2077 .iter()
2078 .map(serde_json::to_value)
2079 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2080 .context("Can't serialize data files to json")?,
2081 );
2082 let json_value = serde_json::Value::Object(
2083 vec![
2084 (
2085 SCHEMA_ID.to_owned(),
2086 serde_json::Value::Number(value.schema_id.into()),
2087 ),
2088 (
2089 PARTITION_SPEC_ID.to_owned(),
2090 serde_json::Value::Number(value.partition_spec_id.into()),
2091 ),
2092 (DATA_FILES.to_owned(), json_data_files),
2093 ]
2094 .into_iter()
2095 .collect(),
2096 );
2097 Ok(SinkMetadata {
2098 metadata: Some(Serialized(SerializedMetadata {
2099 metadata: serde_json::to_vec(&json_value)
2100 .context("Can't serialize iceberg sink metadata")?,
2101 })),
2102 })
2103 }
2104}
2105
2106impl TryFrom<IcebergCommitResult> for Vec<u8> {
2107 type Error = SinkError;
2108
2109 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2110 let json_data_files = serde_json::Value::Array(
2111 value
2112 .data_files
2113 .iter()
2114 .map(serde_json::to_value)
2115 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2116 .context("Can't serialize data files to json")?,
2117 );
2118 let json_value = serde_json::Value::Object(
2119 vec![
2120 (
2121 SCHEMA_ID.to_owned(),
2122 serde_json::Value::Number(value.schema_id.into()),
2123 ),
2124 (
2125 PARTITION_SPEC_ID.to_owned(),
2126 serde_json::Value::Number(value.partition_spec_id.into()),
2127 ),
2128 (DATA_FILES.to_owned(), json_data_files),
2129 ]
2130 .into_iter()
2131 .collect(),
2132 );
2133 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2134 }
2135}
2136pub struct IcebergSinkCommitter {
2137 catalog: Arc<dyn Catalog>,
2138 table: Table,
2139 pub last_commit_epoch: u64,
2140 pub(crate) sink_id: SinkId,
2141 pub(crate) config: IcebergConfig,
2142 pub(crate) param: SinkParam,
2143 commit_retry_num: u32,
2144 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2145}
2146
2147impl IcebergSinkCommitter {
2148 async fn reload_table(
2151 catalog: &dyn Catalog,
2152 table_ident: &TableIdent,
2153 schema_id: i32,
2154 partition_spec_id: i32,
2155 ) -> Result<Table> {
2156 let table = catalog
2157 .load_table(table_ident)
2158 .await
2159 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2160 if table.metadata().current_schema_id() != schema_id {
2161 return Err(SinkError::Iceberg(anyhow!(
2162 "Schema evolution not supported, expect schema id {}, but got {}",
2163 schema_id,
2164 table.metadata().current_schema_id()
2165 )));
2166 }
2167 if table.metadata().default_partition_spec_id() != partition_spec_id {
2168 return Err(SinkError::Iceberg(anyhow!(
2169 "Partition evolution not supported, expect partition spec id {}, but got {}",
2170 partition_spec_id,
2171 table.metadata().default_partition_spec_id()
2172 )));
2173 }
2174 Ok(table)
2175 }
2176}
2177
2178#[async_trait]
2179impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2180 async fn init(&mut self) -> Result<()> {
2181 tracing::info!(
2182 sink_id = %self.param.sink_id,
2183 "Iceberg sink coordinator initialized",
2184 );
2185
2186 Ok(())
2187 }
2188
2189 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2190 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2191
2192 if metadata.is_empty() {
2193 tracing::debug!(?epoch, "No datafile to commit");
2194 return Ok(());
2195 }
2196
2197 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2199 self.commit_data_impl(epoch, write_results, snapshot_id)
2200 .await?;
2201 }
2202
2203 Ok(())
2204 }
2205
2206 async fn commit_schema_change(
2207 &mut self,
2208 epoch: u64,
2209 schema_change: PbSinkSchemaChange,
2210 ) -> Result<()> {
2211 tracing::info!(
2212 "Committing schema change {:?} in epoch {}",
2213 schema_change,
2214 epoch
2215 );
2216 self.commit_schema_change_impl(schema_change).await?;
2217 tracing::info!("Successfully committed schema change in epoch {}", epoch);
2218
2219 Ok(())
2220 }
2221}
2222
2223#[async_trait]
2224impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2225 async fn init(&mut self) -> Result<()> {
2226 tracing::info!(
2227 sink_id = %self.param.sink_id,
2228 "Iceberg sink coordinator initialized",
2229 );
2230
2231 Ok(())
2232 }
2233
2234 async fn pre_commit(
2235 &mut self,
2236 epoch: u64,
2237 metadata: Vec<SinkMetadata>,
2238 _schema_change: Option<PbSinkSchemaChange>,
2239 ) -> Result<Option<Vec<u8>>> {
2240 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2241
2242 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2243 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2244 None => {
2245 tracing::debug!(?epoch, "no data to pre commit");
2246 return Ok(None);
2247 }
2248 };
2249
2250 let mut write_results_bytes = Vec::new();
2251 for each_parallelism_write_result in write_results {
2252 let each_parallelism_write_result_bytes: Vec<u8> =
2253 each_parallelism_write_result.try_into()?;
2254 write_results_bytes.push(each_parallelism_write_result_bytes);
2255 }
2256
2257 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2258 write_results_bytes.push(snapshot_id_bytes);
2259
2260 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2261 Ok(Some(pre_commit_metadata_bytes))
2262 }
2263
2264 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2265 tracing::debug!("Starting iceberg commit in epoch {epoch}");
2266
2267 if commit_metadata.is_empty() {
2268 tracing::debug!(?epoch, "No datafile to commit");
2269 return Ok(());
2270 }
2271
2272 let mut payload = deserialize_metadata(commit_metadata);
2274 if payload.is_empty() {
2275 return Err(SinkError::Iceberg(anyhow!(
2276 "Invalid commit metadata: empty payload"
2277 )));
2278 }
2279
2280 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2282 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2283 })?;
2284 let snapshot_id = i64::from_le_bytes(
2285 snapshot_id_bytes
2286 .try_into()
2287 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2288 );
2289
2290 let write_results = payload
2292 .into_iter()
2293 .map(IcebergCommitResult::try_from_serialized_bytes)
2294 .collect::<Result<Vec<_>>>()?;
2295
2296 let snapshot_committed = self
2297 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2298 .await?;
2299
2300 if snapshot_committed {
2301 tracing::info!(
2302 "Snapshot id {} already committed in iceberg table, skip committing again.",
2303 snapshot_id
2304 );
2305 return Ok(());
2306 }
2307
2308 self.commit_data_impl(epoch, write_results, snapshot_id)
2309 .await
2310 }
2311
2312 async fn commit_schema_change(
2313 &mut self,
2314 epoch: u64,
2315 schema_change: PbSinkSchemaChange,
2316 ) -> Result<()> {
2317 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2318 if schema_updated {
2319 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2320 return Ok(());
2321 }
2322
2323 tracing::info!(
2324 "Committing schema change {:?} in epoch {}",
2325 schema_change,
2326 epoch
2327 );
2328 self.commit_schema_change_impl(schema_change).await?;
2329 tracing::info!("Successfully committed schema change in epoch {epoch}");
2330
2331 Ok(())
2332 }
2333
2334 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2335 tracing::debug!("Abort not implemented yet");
2337 }
2338}
2339
2340impl IcebergSinkCommitter {
2342 fn pre_commit_inner(
2343 &mut self,
2344 _epoch: u64,
2345 metadata: Vec<SinkMetadata>,
2346 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2347 let write_results: Vec<IcebergCommitResult> = metadata
2348 .iter()
2349 .map(IcebergCommitResult::try_from)
2350 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2351
2352 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2354 return Ok(None);
2355 }
2356
2357 let expect_schema_id = write_results[0].schema_id;
2358 let expect_partition_spec_id = write_results[0].partition_spec_id;
2359
2360 if write_results
2362 .iter()
2363 .any(|r| r.schema_id != expect_schema_id)
2364 || write_results
2365 .iter()
2366 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2367 {
2368 return Err(SinkError::Iceberg(anyhow!(
2369 "schema_id and partition_spec_id should be the same in all write results"
2370 )));
2371 }
2372
2373 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2374
2375 Ok(Some((write_results, snapshot_id)))
2376 }
2377
2378 async fn commit_data_impl(
2379 &mut self,
2380 epoch: u64,
2381 write_results: Vec<IcebergCommitResult>,
2382 snapshot_id: i64,
2383 ) -> Result<()> {
2384 assert!(
2386 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2387 );
2388
2389 self.wait_for_snapshot_limit().await?;
2391
2392 let expect_schema_id = write_results[0].schema_id;
2393 let expect_partition_spec_id = write_results[0].partition_spec_id;
2394
2395 self.table = Self::reload_table(
2397 self.catalog.as_ref(),
2398 self.table.identifier(),
2399 expect_schema_id,
2400 expect_partition_spec_id,
2401 )
2402 .await?;
2403
2404 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2405 return Err(SinkError::Iceberg(anyhow!(
2406 "Can't find schema by id {}",
2407 expect_schema_id
2408 )));
2409 };
2410 let Some(partition_spec) = self
2411 .table
2412 .metadata()
2413 .partition_spec_by_id(expect_partition_spec_id)
2414 else {
2415 return Err(SinkError::Iceberg(anyhow!(
2416 "Can't find partition spec by id {}",
2417 expect_partition_spec_id
2418 )));
2419 };
2420 let partition_type = partition_spec
2421 .as_ref()
2422 .clone()
2423 .partition_type(schema)
2424 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2425
2426 let data_files = write_results
2427 .into_iter()
2428 .flat_map(|r| {
2429 r.data_files.into_iter().map(|f| {
2430 f.try_into(expect_partition_spec_id, &partition_type, schema)
2431 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2432 })
2433 })
2434 .collect::<Result<Vec<DataFile>>>()?;
2435 let retry_strategy = ExponentialBackoff::from_millis(10)
2440 .max_delay(Duration::from_secs(60))
2441 .map(jitter)
2442 .take(self.commit_retry_num as usize);
2443 let catalog = self.catalog.clone();
2444 let table_ident = self.table.identifier().clone();
2445
2446 enum CommitError {
2451 ReloadTable(SinkError), Commit(SinkError), }
2454
2455 let table = RetryIf::spawn(
2456 retry_strategy,
2457 || async {
2458 let table = Self::reload_table(
2460 catalog.as_ref(),
2461 &table_ident,
2462 expect_schema_id,
2463 expect_partition_spec_id,
2464 )
2465 .await
2466 .map_err(|e| {
2467 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2468 CommitError::ReloadTable(e)
2469 })?;
2470
2471 let txn = Transaction::new(&table);
2472 let append_action = txn
2473 .fast_append()
2474 .set_snapshot_id(snapshot_id)
2475 .set_target_branch(commit_branch(
2476 self.config.r#type.as_str(),
2477 self.config.write_mode,
2478 ))
2479 .add_data_files(data_files.clone());
2480
2481 let tx = append_action.apply(txn).map_err(|err| {
2482 let err: IcebergError = err.into();
2483 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2484 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2485 })?;
2486
2487 tx.commit(catalog.as_ref()).await.map_err(|err| {
2488 let err: IcebergError = err.into();
2489 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2490 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2491 })
2492 },
2493 |err: &CommitError| {
2494 match err {
2496 CommitError::Commit(_) => {
2497 tracing::warn!("Commit failed, will retry");
2498 true
2499 }
2500 CommitError::ReloadTable(_) => {
2501 tracing::error!(
2502 "reload_table failed with non-retriable error, will not retry"
2503 );
2504 false
2505 }
2506 }
2507 },
2508 )
2509 .await
2510 .map_err(|e| match e {
2511 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2512 })?;
2513 self.table = table;
2514
2515 let snapshot_num = self.table.metadata().snapshots().count();
2516 let catalog_name = self.config.common.catalog_name();
2517 let table_name = self.table.identifier().to_string();
2518 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2519 GLOBAL_SINK_METRICS
2520 .iceberg_snapshot_num
2521 .with_guarded_label_values(&metrics_labels)
2522 .set(snapshot_num as i64);
2523
2524 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2525
2526 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2527 && self.config.enable_compaction
2528 && iceberg_compact_stat_sender
2529 .send(IcebergSinkCompactionUpdate {
2530 sink_id: self.sink_id,
2531 compaction_interval: self.config.compaction_interval_sec(),
2532 force_compaction: false,
2533 })
2534 .is_err()
2535 {
2536 warn!("failed to send iceberg compaction stats");
2537 }
2538
2539 Ok(())
2540 }
2541
2542 async fn is_snapshot_id_in_iceberg(
2546 &self,
2547 iceberg_config: &IcebergConfig,
2548 snapshot_id: i64,
2549 ) -> Result<bool> {
2550 let table = iceberg_config.load_table().await?;
2551 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2552 Ok(true)
2553 } else {
2554 Ok(false)
2555 }
2556 }
2557
2558 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2561 let current_schema = self.table.metadata().current_schema();
2562 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2563 .context("Failed to convert schema")
2564 .map_err(SinkError::Iceberg)?;
2565
2566 let iceberg_arrow_convert = IcebergArrowConvert;
2567
2568 let schema_matches = |expected: &[ArrowField]| {
2569 if current_arrow_schema.fields().len() != expected.len() {
2570 return false;
2571 }
2572
2573 expected.iter().all(|expected_field| {
2574 current_arrow_schema.fields().iter().any(|current_field| {
2575 current_field.name() == expected_field.name()
2576 && current_field.data_type() == expected_field.data_type()
2577 })
2578 })
2579 };
2580
2581 let original_arrow_fields: Vec<ArrowField> = schema_change
2582 .original_schema
2583 .iter()
2584 .map(|pb_field| {
2585 let field = Field::from(pb_field);
2586 iceberg_arrow_convert
2587 .to_arrow_field(&field.name, &field.data_type)
2588 .context("Failed to convert field to arrow")
2589 .map_err(SinkError::Iceberg)
2590 })
2591 .collect::<Result<_>>()?;
2592
2593 if schema_matches(&original_arrow_fields) {
2595 tracing::debug!(
2596 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2597 original_arrow_fields.len()
2598 );
2599 return Ok(false);
2600 }
2601
2602 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2604 schema_change.op.as_ref()
2605 else {
2606 return Err(SinkError::Iceberg(anyhow!(
2607 "Unsupported sink schema change op in iceberg sink: {:?}",
2608 schema_change.op
2609 )));
2610 };
2611
2612 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2613 .fields
2614 .iter()
2615 .map(|pb_field| {
2616 let field = Field::from(pb_field);
2617 iceberg_arrow_convert
2618 .to_arrow_field(&field.name, &field.data_type)
2619 .context("Failed to convert field to arrow")
2620 .map_err(SinkError::Iceberg)
2621 })
2622 .collect::<Result<_>>()?;
2623
2624 let mut expected_after_change = original_arrow_fields;
2625 expected_after_change.extend(add_arrow_fields);
2626
2627 if schema_matches(&expected_after_change) {
2629 tracing::debug!(
2630 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2631 expected_after_change.len()
2632 );
2633 return Ok(true);
2634 }
2635
2636 Err(SinkError::Iceberg(anyhow!(
2637 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2638 schema_change.original_schema.len()
2639 )))
2640 }
2641
2642 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2646 use iceberg::spec::NestedField;
2647
2648 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2649 schema_change.op.as_ref()
2650 else {
2651 return Err(SinkError::Iceberg(anyhow!(
2652 "Unsupported sink schema change op in iceberg sink: {:?}",
2653 schema_change.op
2654 )));
2655 };
2656
2657 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2658
2659 let metadata = self.table.metadata();
2661 let mut next_field_id = metadata.last_column_id() + 1;
2662 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2663
2664 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2666 let mut new_fields = Vec::new();
2667
2668 for field in &add_columns {
2669 let arrow_field = iceberg_create_table_arrow_convert
2671 .to_arrow_field(&field.name, &field.data_type)
2672 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2673 .map_err(SinkError::Iceberg)?;
2674
2675 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2677 .map_err(|err| {
2678 SinkError::Iceberg(
2679 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2680 )
2681 })?;
2682
2683 let nested_field = Arc::new(NestedField::optional(
2685 next_field_id,
2686 &field.name,
2687 iceberg_type,
2688 ));
2689
2690 new_fields.push(nested_field);
2691 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2692 next_field_id += 1;
2693 }
2694
2695 tracing::info!(
2697 "Committing schema change to catalog for table {}",
2698 self.table.identifier()
2699 );
2700
2701 let txn = Transaction::new(&self.table);
2702 let action = txn.update_schema().add_fields(new_fields);
2703
2704 let updated_table = action
2705 .apply(txn)
2706 .context("Failed to apply schema update action")
2707 .map_err(SinkError::Iceberg)?
2708 .commit(self.catalog.as_ref())
2709 .await
2710 .context("Failed to commit table schema change")
2711 .map_err(SinkError::Iceberg)?;
2712
2713 self.table = updated_table;
2714
2715 tracing::info!(
2716 "Successfully committed schema change, added {} columns to iceberg table",
2717 add_columns.len()
2718 );
2719
2720 Ok(())
2721 }
2722
2723 fn count_snapshots_since_rewrite(&self) -> usize {
2726 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2727 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2728
2729 let mut count = 0;
2731 for snapshot in snapshots {
2732 let summary = snapshot.summary();
2734 match &summary.operation {
2735 Operation::Replace => {
2736 break;
2738 }
2739
2740 _ => {
2741 count += 1;
2743 }
2744 }
2745 }
2746
2747 count
2748 }
2749
2750 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2752 if !self.config.enable_compaction {
2753 return Ok(());
2754 }
2755
2756 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2757 loop {
2758 let current_count = self.count_snapshots_since_rewrite();
2759
2760 if current_count < max_snapshots {
2761 tracing::info!(
2762 "Snapshot count check passed: {} < {}",
2763 current_count,
2764 max_snapshots
2765 );
2766 break;
2767 }
2768
2769 tracing::info!(
2770 "Snapshot count {} exceeds limit {}, waiting...",
2771 current_count,
2772 max_snapshots
2773 );
2774
2775 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2776 && iceberg_compact_stat_sender
2777 .send(IcebergSinkCompactionUpdate {
2778 sink_id: self.sink_id,
2779 compaction_interval: self.config.compaction_interval_sec(),
2780 force_compaction: true,
2781 })
2782 .is_err()
2783 {
2784 tracing::warn!("failed to send iceberg compaction stats");
2785 }
2786
2787 tokio::time::sleep(Duration::from_secs(30)).await;
2789
2790 self.table = self.config.load_table().await?;
2792 }
2793 }
2794 Ok(())
2795 }
2796}
2797
2798const MAP_KEY: &str = "key";
2799const MAP_VALUE: &str = "value";
2800
2801fn get_fields<'a>(
2802 our_field_type: &'a risingwave_common::types::DataType,
2803 data_type: &ArrowDataType,
2804 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2805) -> Option<ArrowFields> {
2806 match data_type {
2807 ArrowDataType::Struct(fields) => {
2808 match our_field_type {
2809 risingwave_common::types::DataType::Struct(struct_fields) => {
2810 struct_fields.iter().for_each(|(name, data_type)| {
2811 let res = schema_fields.insert(name, data_type);
2812 assert!(res.is_none())
2814 });
2815 }
2816 risingwave_common::types::DataType::Map(map_fields) => {
2817 schema_fields.insert(MAP_KEY, map_fields.key());
2818 schema_fields.insert(MAP_VALUE, map_fields.value());
2819 }
2820 risingwave_common::types::DataType::List(list) => {
2821 list.elem()
2822 .as_struct()
2823 .iter()
2824 .for_each(|(name, data_type)| {
2825 let res = schema_fields.insert(name, data_type);
2826 assert!(res.is_none())
2828 });
2829 }
2830 _ => {}
2831 };
2832 Some(fields.clone())
2833 }
2834 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2835 get_fields(our_field_type, field.data_type(), schema_fields)
2836 }
2837 _ => None, }
2839}
2840
2841fn check_compatibility(
2842 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2843 fields: &ArrowFields,
2844) -> anyhow::Result<bool> {
2845 for arrow_field in fields {
2846 let our_field_type = schema_fields
2847 .get(arrow_field.name().as_str())
2848 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2849
2850 let converted_arrow_data_type = IcebergArrowConvert
2852 .to_arrow_field("", our_field_type)
2853 .map_err(|e| anyhow!(e))?
2854 .data_type()
2855 .clone();
2856
2857 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2858 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2859 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2860 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2861 (ArrowDataType::List(_), ArrowDataType::List(field))
2862 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2863 let mut schema_fields = HashMap::new();
2864 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2865 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2866 }
2867 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2869 let mut schema_fields = HashMap::new();
2870 our_field_type
2871 .as_struct()
2872 .iter()
2873 .for_each(|(name, data_type)| {
2874 let res = schema_fields.insert(name, data_type);
2875 assert!(res.is_none())
2877 });
2878 check_compatibility(schema_fields, fields)?
2879 }
2880 (left, right) => left.equals_datatype(right),
2888 };
2889 if !compatible {
2890 bail!(
2891 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2892 arrow_field.name(),
2893 converted_arrow_data_type,
2894 arrow_field.data_type()
2895 );
2896 }
2897 }
2898 Ok(true)
2899}
2900
2901pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2903 if rw_schema.fields.len() != arrow_schema.fields().len() {
2904 bail!(
2905 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2906 rw_schema.fields.len(),
2907 arrow_schema.fields.len()
2908 );
2909 }
2910
2911 let mut schema_fields = HashMap::new();
2912 rw_schema.fields.iter().for_each(|field| {
2913 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2914 assert!(res.is_none())
2916 });
2917
2918 check_compatibility(schema_fields, &arrow_schema.fields)?;
2919 Ok(())
2920}
2921
2922fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2923 serde_json::to_vec(&metadata).unwrap()
2924}
2925
2926fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2927 serde_json::from_slice(&bytes).unwrap()
2928}
2929
2930pub fn parse_partition_by_exprs(
2931 expr: String,
2932) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2933 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2935 if !re.is_match(&expr) {
2936 bail!(format!(
2937 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2938 expr
2939 ))
2940 }
2941 let caps = re.captures_iter(&expr);
2942
2943 let mut partition_columns = vec![];
2944
2945 for mat in caps {
2946 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2947 (&mat["transform"], Transform::Identity)
2948 } else {
2949 let mut func = mat["transform"].to_owned();
2950 if func == "bucket" || func == "truncate" {
2951 let n = &mat
2952 .name("n")
2953 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2954 .as_str();
2955 func = format!("{func}[{n}]");
2956 }
2957 (
2958 &mat["field"],
2959 Transform::from_str(&func)
2960 .with_context(|| format!("invalid transform function {}", func))?,
2961 )
2962 };
2963 partition_columns.push((column.to_owned(), transform));
2964 }
2965 Ok(partition_columns)
2966}
2967
2968pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2969 if should_enable_iceberg_cow(sink_type, write_mode) {
2970 ICEBERG_COW_BRANCH.to_owned()
2971 } else {
2972 MAIN_BRANCH.to_owned()
2973 }
2974}
2975
2976pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2977 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2978}
2979
2980impl crate::with_options::WithOptions for IcebergWriteMode {}
2981
2982impl crate::with_options::WithOptions for FormatVersion {}
2983
2984impl crate::with_options::WithOptions for CompactionType {}
2985
2986#[cfg(test)]
2987mod test {
2988 use std::collections::BTreeMap;
2989
2990 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2991 use risingwave_common::types::{DataType, MapType, StructType};
2992
2993 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2994 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2995 use crate::sink::iceberg::{
2996 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2997 ENABLE_SNAPSHOT_EXPIRATION, FormatVersion, IcebergConfig, IcebergWriteMode,
2998 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2999 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3000 };
3001
3002 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
3005 fn test_compatible_arrow_schema() {
3006 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3007
3008 use super::*;
3009 let risingwave_schema = Schema::new(vec![
3010 Field::with_name(DataType::Int32, "a"),
3011 Field::with_name(DataType::Int32, "b"),
3012 Field::with_name(DataType::Int32, "c"),
3013 ]);
3014 let arrow_schema = ArrowSchema::new(vec![
3015 ArrowField::new("a", ArrowDataType::Int32, false),
3016 ArrowField::new("b", ArrowDataType::Int32, false),
3017 ArrowField::new("c", ArrowDataType::Int32, false),
3018 ]);
3019
3020 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3021
3022 let risingwave_schema = Schema::new(vec![
3023 Field::with_name(DataType::Int32, "d"),
3024 Field::with_name(DataType::Int32, "c"),
3025 Field::with_name(DataType::Int32, "a"),
3026 Field::with_name(DataType::Int32, "b"),
3027 ]);
3028 let arrow_schema = ArrowSchema::new(vec![
3029 ArrowField::new("a", ArrowDataType::Int32, false),
3030 ArrowField::new("b", ArrowDataType::Int32, false),
3031 ArrowField::new("d", ArrowDataType::Int32, false),
3032 ArrowField::new("c", ArrowDataType::Int32, false),
3033 ]);
3034 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3035
3036 let risingwave_schema = Schema::new(vec![
3037 Field::with_name(
3038 DataType::Struct(StructType::new(vec![
3039 ("a1", DataType::Int32),
3040 (
3041 "a2",
3042 DataType::Struct(StructType::new(vec![
3043 ("a21", DataType::Bytea),
3044 (
3045 "a22",
3046 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3047 ),
3048 ])),
3049 ),
3050 ])),
3051 "a",
3052 ),
3053 Field::with_name(
3054 DataType::list(DataType::Struct(StructType::new(vec![
3055 ("b1", DataType::Int32),
3056 ("b2", DataType::Bytea),
3057 (
3058 "b3",
3059 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3060 ),
3061 ]))),
3062 "b",
3063 ),
3064 Field::with_name(
3065 DataType::Map(MapType::from_kv(
3066 DataType::Varchar,
3067 DataType::list(DataType::Struct(StructType::new([
3068 ("c1", DataType::Int32),
3069 ("c2", DataType::Bytea),
3070 (
3071 "c3",
3072 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3073 ),
3074 ]))),
3075 )),
3076 "c",
3077 ),
3078 ]);
3079 let arrow_schema = ArrowSchema::new(vec![
3080 ArrowField::new(
3081 "a",
3082 ArrowDataType::Struct(ArrowFields::from(vec![
3083 ArrowField::new("a1", ArrowDataType::Int32, false),
3084 ArrowField::new(
3085 "a2",
3086 ArrowDataType::Struct(ArrowFields::from(vec![
3087 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3088 ArrowField::new_map(
3089 "a22",
3090 "entries",
3091 ArrowFieldRef::new(ArrowField::new(
3092 "key",
3093 ArrowDataType::Utf8,
3094 false,
3095 )),
3096 ArrowFieldRef::new(ArrowField::new(
3097 "value",
3098 ArrowDataType::Utf8,
3099 false,
3100 )),
3101 false,
3102 false,
3103 ),
3104 ])),
3105 false,
3106 ),
3107 ])),
3108 false,
3109 ),
3110 ArrowField::new(
3111 "b",
3112 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3113 ArrowDataType::Struct(ArrowFields::from(vec![
3114 ArrowField::new("b1", ArrowDataType::Int32, false),
3115 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3116 ArrowField::new_map(
3117 "b3",
3118 "entries",
3119 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3120 ArrowFieldRef::new(ArrowField::new(
3121 "value",
3122 ArrowDataType::Utf8,
3123 false,
3124 )),
3125 false,
3126 false,
3127 ),
3128 ])),
3129 false,
3130 ))),
3131 false,
3132 ),
3133 ArrowField::new_map(
3134 "c",
3135 "entries",
3136 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3137 ArrowFieldRef::new(ArrowField::new(
3138 "value",
3139 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3140 ArrowDataType::Struct(ArrowFields::from(vec![
3141 ArrowField::new("c1", ArrowDataType::Int32, false),
3142 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3143 ArrowField::new_map(
3144 "c3",
3145 "entries",
3146 ArrowFieldRef::new(ArrowField::new(
3147 "key",
3148 ArrowDataType::Utf8,
3149 false,
3150 )),
3151 ArrowFieldRef::new(ArrowField::new(
3152 "value",
3153 ArrowDataType::Utf8,
3154 false,
3155 )),
3156 false,
3157 false,
3158 ),
3159 ])),
3160 false,
3161 ))),
3162 false,
3163 )),
3164 false,
3165 false,
3166 ),
3167 ]);
3168 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3169 }
3170
3171 #[test]
3172 fn test_parse_iceberg_config() {
3173 let values = [
3174 ("connector", "iceberg"),
3175 ("type", "upsert"),
3176 ("primary_key", "v1"),
3177 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3178 ("warehouse.path", "s3://iceberg"),
3179 ("s3.endpoint", "http://127.0.0.1:9301"),
3180 ("s3.access.key", "hummockadmin"),
3181 ("s3.secret.key", "hummockadmin"),
3182 ("s3.path.style.access", "true"),
3183 ("s3.region", "us-east-1"),
3184 ("catalog.type", "jdbc"),
3185 ("catalog.name", "demo"),
3186 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3187 ("catalog.jdbc.user", "admin"),
3188 ("catalog.jdbc.password", "123456"),
3189 ("database.name", "demo_db"),
3190 ("table.name", "demo_table"),
3191 ("enable_compaction", "true"),
3192 ("compaction_interval_sec", "1800"),
3193 ("enable_snapshot_expiration", "true"),
3194 ]
3195 .into_iter()
3196 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3197 .collect();
3198
3199 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3200
3201 let expected_iceberg_config = IcebergConfig {
3202 common: IcebergCommon {
3203 warehouse_path: Some("s3://iceberg".to_owned()),
3204 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3205 s3_region: Some("us-east-1".to_owned()),
3206 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3207 s3_access_key: Some("hummockadmin".to_owned()),
3208 s3_secret_key: Some("hummockadmin".to_owned()),
3209 s3_iam_role_arn: None,
3210 gcs_credential: None,
3211 catalog_type: Some("jdbc".to_owned()),
3212 glue_id: None,
3213 glue_region: None,
3214 glue_access_key: None,
3215 glue_secret_key: None,
3216 glue_iam_role_arn: None,
3217 catalog_name: Some("demo".to_owned()),
3218 s3_path_style_access: Some(true),
3219 catalog_credential: None,
3220 catalog_oauth2_server_uri: None,
3221 catalog_scope: None,
3222 catalog_token: None,
3223 enable_config_load: None,
3224 rest_signing_name: None,
3225 rest_signing_region: None,
3226 rest_sigv4_enabled: None,
3227 hosted_catalog: None,
3228 azblob_account_name: None,
3229 azblob_account_key: None,
3230 azblob_endpoint_url: None,
3231 catalog_header: None,
3232 adlsgen2_account_name: None,
3233 adlsgen2_account_key: None,
3234 adlsgen2_endpoint: None,
3235 vended_credentials: None,
3236 catalog_security: None,
3237 gcp_auth_scopes: None,
3238 catalog_io_impl: None,
3239 },
3240 table: IcebergTableIdentifier {
3241 database_name: Some("demo_db".to_owned()),
3242 table_name: "demo_table".to_owned(),
3243 },
3244 r#type: "upsert".to_owned(),
3245 force_append_only: false,
3246 primary_key: Some(vec!["v1".to_owned()]),
3247 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3248 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3249 .into_iter()
3250 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3251 .collect(),
3252 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3253 create_table_if_not_exists: false,
3254 is_exactly_once: Some(true),
3255 commit_retry_num: 8,
3256 enable_compaction: true,
3257 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3258 enable_snapshot_expiration: true,
3259 write_mode: IcebergWriteMode::MergeOnRead,
3260 format_version: FormatVersion::V2,
3261 snapshot_expiration_max_age_millis: None,
3262 snapshot_expiration_retain_last: None,
3263 snapshot_expiration_clear_expired_files: true,
3264 snapshot_expiration_clear_expired_meta_data: true,
3265 max_snapshots_num_before_compaction: None,
3266 small_files_threshold_mb: None,
3267 delete_files_count_threshold: None,
3268 trigger_snapshot_count: None,
3269 target_file_size_mb: None,
3270 compaction_type: None,
3271 write_parquet_compression: None,
3272 write_parquet_max_row_group_rows: None,
3273 };
3274
3275 assert_eq!(iceberg_config, expected_iceberg_config);
3276
3277 assert_eq!(
3278 &iceberg_config.full_table_name().unwrap().to_string(),
3279 "demo_db.demo_table"
3280 );
3281 }
3282
3283 async fn test_create_catalog(configs: BTreeMap<String, String>) {
3284 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3285
3286 let _table = iceberg_config.load_table().await.unwrap();
3287 }
3288
3289 #[tokio::test]
3290 #[ignore]
3291 async fn test_storage_catalog() {
3292 let values = [
3293 ("connector", "iceberg"),
3294 ("type", "append-only"),
3295 ("force_append_only", "true"),
3296 ("s3.endpoint", "http://127.0.0.1:9301"),
3297 ("s3.access.key", "hummockadmin"),
3298 ("s3.secret.key", "hummockadmin"),
3299 ("s3.region", "us-east-1"),
3300 ("s3.path.style.access", "true"),
3301 ("catalog.name", "demo"),
3302 ("catalog.type", "storage"),
3303 ("warehouse.path", "s3://icebergdata/demo"),
3304 ("database.name", "s1"),
3305 ("table.name", "t1"),
3306 ]
3307 .into_iter()
3308 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3309 .collect();
3310
3311 test_create_catalog(values).await;
3312 }
3313
3314 #[tokio::test]
3315 #[ignore]
3316 async fn test_rest_catalog() {
3317 let values = [
3318 ("connector", "iceberg"),
3319 ("type", "append-only"),
3320 ("force_append_only", "true"),
3321 ("s3.endpoint", "http://127.0.0.1:9301"),
3322 ("s3.access.key", "hummockadmin"),
3323 ("s3.secret.key", "hummockadmin"),
3324 ("s3.region", "us-east-1"),
3325 ("s3.path.style.access", "true"),
3326 ("catalog.name", "demo"),
3327 ("catalog.type", "rest"),
3328 ("catalog.uri", "http://192.168.167.4:8181"),
3329 ("warehouse.path", "s3://icebergdata/demo"),
3330 ("database.name", "s1"),
3331 ("table.name", "t1"),
3332 ]
3333 .into_iter()
3334 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3335 .collect();
3336
3337 test_create_catalog(values).await;
3338 }
3339
3340 #[tokio::test]
3341 #[ignore]
3342 async fn test_jdbc_catalog() {
3343 let values = [
3344 ("connector", "iceberg"),
3345 ("type", "append-only"),
3346 ("force_append_only", "true"),
3347 ("s3.endpoint", "http://127.0.0.1:9301"),
3348 ("s3.access.key", "hummockadmin"),
3349 ("s3.secret.key", "hummockadmin"),
3350 ("s3.region", "us-east-1"),
3351 ("s3.path.style.access", "true"),
3352 ("catalog.name", "demo"),
3353 ("catalog.type", "jdbc"),
3354 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3355 ("catalog.jdbc.user", "admin"),
3356 ("catalog.jdbc.password", "123456"),
3357 ("warehouse.path", "s3://icebergdata/demo"),
3358 ("database.name", "s1"),
3359 ("table.name", "t1"),
3360 ]
3361 .into_iter()
3362 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3363 .collect();
3364
3365 test_create_catalog(values).await;
3366 }
3367
3368 #[tokio::test]
3369 #[ignore]
3370 async fn test_hive_catalog() {
3371 let values = [
3372 ("connector", "iceberg"),
3373 ("type", "append-only"),
3374 ("force_append_only", "true"),
3375 ("s3.endpoint", "http://127.0.0.1:9301"),
3376 ("s3.access.key", "hummockadmin"),
3377 ("s3.secret.key", "hummockadmin"),
3378 ("s3.region", "us-east-1"),
3379 ("s3.path.style.access", "true"),
3380 ("catalog.name", "demo"),
3381 ("catalog.type", "hive"),
3382 ("catalog.uri", "thrift://localhost:9083"),
3383 ("warehouse.path", "s3://icebergdata/demo"),
3384 ("database.name", "s1"),
3385 ("table.name", "t1"),
3386 ]
3387 .into_iter()
3388 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3389 .collect();
3390
3391 test_create_catalog(values).await;
3392 }
3393
3394 #[test]
3396 fn test_parse_google_auth_config() {
3397 let values: BTreeMap<String, String> = [
3398 ("connector", "iceberg"),
3399 ("type", "append-only"),
3400 ("force_append_only", "true"),
3401 ("catalog.name", "biglake-catalog"),
3402 ("catalog.type", "rest"),
3403 ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3404 ("warehouse.path", "bq://projects/my-gcp-project"),
3405 ("catalog.header", "x-goog-user-project=my-gcp-project"),
3406 ("catalog.security", "google"),
3407 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3408 ("database.name", "my_dataset"),
3409 ("table.name", "my_table"),
3410 ]
3411 .into_iter()
3412 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3413 .collect();
3414
3415 let config = IcebergConfig::from_btreemap(values).unwrap();
3416 assert_eq!(config.catalog_type(), "rest");
3417 assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3418 assert_eq!(
3419 config.common.gcp_auth_scopes.as_deref(),
3420 Some(
3421 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3422 )
3423 );
3424 assert_eq!(
3425 config.common.warehouse_path.as_deref(),
3426 Some("bq://projects/my-gcp-project")
3427 );
3428 assert_eq!(
3429 config.common.catalog_header.as_deref(),
3430 Some("x-goog-user-project=my-gcp-project")
3431 );
3432 }
3433
3434 #[test]
3436 fn test_parse_oauth2_security_config() {
3437 let values: BTreeMap<String, String> = [
3438 ("connector", "iceberg"),
3439 ("type", "append-only"),
3440 ("force_append_only", "true"),
3441 ("catalog.name", "oauth2-catalog"),
3442 ("catalog.type", "rest"),
3443 ("catalog.uri", "https://example.com/iceberg/rest"),
3444 ("warehouse.path", "s3://my-bucket/warehouse"),
3445 ("catalog.security", "oauth2"),
3446 ("catalog.credential", "client_id:client_secret"),
3447 ("catalog.token", "bearer-token"),
3448 (
3449 "catalog.oauth2_server_uri",
3450 "https://oauth.example.com/token",
3451 ),
3452 ("catalog.scope", "read write"),
3453 ("database.name", "test_db"),
3454 ("table.name", "test_table"),
3455 ]
3456 .into_iter()
3457 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3458 .collect();
3459
3460 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3461
3462 assert_eq!(iceberg_config.catalog_type(), "rest");
3464
3465 assert_eq!(
3467 iceberg_config.common.catalog_security.as_deref(),
3468 Some("oauth2")
3469 );
3470 assert_eq!(
3471 iceberg_config.common.catalog_credential.as_deref(),
3472 Some("client_id:client_secret")
3473 );
3474 assert_eq!(
3475 iceberg_config.common.catalog_token.as_deref(),
3476 Some("bearer-token")
3477 );
3478 assert_eq!(
3479 iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3480 Some("https://oauth.example.com/token")
3481 );
3482 assert_eq!(
3483 iceberg_config.common.catalog_scope.as_deref(),
3484 Some("read write")
3485 );
3486 }
3487
3488 #[test]
3490 fn test_parse_invalid_security_config() {
3491 let values: BTreeMap<String, String> = [
3492 ("connector", "iceberg"),
3493 ("type", "append-only"),
3494 ("force_append_only", "true"),
3495 ("catalog.name", "invalid-catalog"),
3496 ("catalog.type", "rest"),
3497 ("catalog.uri", "https://example.com/iceberg/rest"),
3498 ("warehouse.path", "s3://my-bucket/warehouse"),
3499 ("catalog.security", "invalid_security_type"),
3500 ("database.name", "test_db"),
3501 ("table.name", "test_table"),
3502 ]
3503 .into_iter()
3504 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3505 .collect();
3506
3507 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3509
3510 assert_eq!(
3512 iceberg_config.common.catalog_security.as_deref(),
3513 Some("invalid_security_type")
3514 );
3515
3516 assert_eq!(iceberg_config.catalog_type(), "rest");
3518 }
3519
3520 #[test]
3522 fn test_parse_custom_io_impl_config() {
3523 let values: BTreeMap<String, String> = [
3524 ("connector", "iceberg"),
3525 ("type", "append-only"),
3526 ("force_append_only", "true"),
3527 ("catalog.name", "gcs-catalog"),
3528 ("catalog.type", "rest"),
3529 ("catalog.uri", "https://example.com/iceberg/rest"),
3530 ("warehouse.path", "gs://my-bucket/warehouse"),
3531 ("catalog.security", "google"),
3532 ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3533 ("database.name", "test_db"),
3534 ("table.name", "test_table"),
3535 ]
3536 .into_iter()
3537 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3538 .collect();
3539
3540 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3541
3542 assert_eq!(iceberg_config.catalog_type(), "rest");
3544
3545 assert_eq!(
3547 iceberg_config.common.catalog_io_impl.as_deref(),
3548 Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3549 );
3550
3551 assert_eq!(
3553 iceberg_config.common.catalog_security.as_deref(),
3554 Some("google")
3555 );
3556 }
3557
3558 #[test]
3559 fn test_config_constants_consistency() {
3560 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3563 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3564 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3565 assert_eq!(WRITE_MODE, "write_mode");
3566 assert_eq!(
3567 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3568 "snapshot_expiration_retain_last"
3569 );
3570 assert_eq!(
3571 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3572 "snapshot_expiration_max_age_millis"
3573 );
3574 assert_eq!(
3575 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3576 "snapshot_expiration_clear_expired_files"
3577 );
3578 assert_eq!(
3579 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3580 "snapshot_expiration_clear_expired_meta_data"
3581 );
3582 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3583 }
3584
3585 #[test]
3587 fn test_parse_compaction_config() {
3588 let values: BTreeMap<String, String> = [
3590 ("connector", "iceberg"),
3591 ("type", "upsert"),
3592 ("primary_key", "id"),
3593 ("warehouse.path", "s3://iceberg"),
3594 ("s3.endpoint", "http://127.0.0.1:9301"),
3595 ("s3.access.key", "test"),
3596 ("s3.secret.key", "test"),
3597 ("s3.region", "us-east-1"),
3598 ("catalog.type", "storage"),
3599 ("catalog.name", "demo"),
3600 ("database.name", "test_db"),
3601 ("table.name", "test_table"),
3602 ("enable_compaction", "true"),
3603 ("compaction.max_snapshots_num", "100"),
3604 ("compaction.small_files_threshold_mb", "512"),
3605 ("compaction.delete_files_count_threshold", "50"),
3606 ("compaction.trigger_snapshot_count", "10"),
3607 ("compaction.target_file_size_mb", "256"),
3608 ("compaction.type", "full"),
3609 ("compaction.write_parquet_compression", "zstd"),
3610 ("compaction.write_parquet_max_row_group_rows", "50000"),
3611 ]
3612 .into_iter()
3613 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3614 .collect();
3615
3616 let config = IcebergConfig::from_btreemap(values).unwrap();
3617 assert!(config.enable_compaction);
3618 assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3619 assert_eq!(config.small_files_threshold_mb, Some(512));
3620 assert_eq!(config.delete_files_count_threshold, Some(50));
3621 assert_eq!(config.trigger_snapshot_count, Some(10));
3622 assert_eq!(config.target_file_size_mb, Some(256));
3623 assert_eq!(config.compaction_type, Some(CompactionType::Full));
3624 assert_eq!(config.target_file_size_mb(), 256);
3625 assert_eq!(config.write_parquet_compression(), "zstd");
3626 assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3627
3628 let values: BTreeMap<String, String> = [
3630 ("connector", "iceberg"),
3631 ("type", "append-only"),
3632 ("force_append_only", "true"),
3633 ("catalog.name", "test-catalog"),
3634 ("catalog.type", "storage"),
3635 ("warehouse.path", "s3://my-bucket/warehouse"),
3636 ("database.name", "test_db"),
3637 ("table.name", "test_table"),
3638 ]
3639 .into_iter()
3640 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3641 .collect();
3642
3643 let config = IcebergConfig::from_btreemap(values).unwrap();
3644 assert_eq!(config.target_file_size_mb(), 1024); assert_eq!(config.write_parquet_compression(), "zstd"); assert_eq!(config.write_parquet_max_row_group_rows(), 122880); }
3648
3649 #[test]
3651 fn test_parse_parquet_compression() {
3652 use parquet::basic::Compression;
3653
3654 use super::parse_parquet_compression;
3655
3656 assert!(matches!(
3658 parse_parquet_compression("snappy"),
3659 Compression::SNAPPY
3660 ));
3661 assert!(matches!(
3662 parse_parquet_compression("gzip"),
3663 Compression::GZIP(_)
3664 ));
3665 assert!(matches!(
3666 parse_parquet_compression("zstd"),
3667 Compression::ZSTD(_)
3668 ));
3669 assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3670 assert!(matches!(
3671 parse_parquet_compression("brotli"),
3672 Compression::BROTLI(_)
3673 ));
3674 assert!(matches!(
3675 parse_parquet_compression("uncompressed"),
3676 Compression::UNCOMPRESSED
3677 ));
3678
3679 assert!(matches!(
3681 parse_parquet_compression("SNAPPY"),
3682 Compression::SNAPPY
3683 ));
3684 assert!(matches!(
3685 parse_parquet_compression("Zstd"),
3686 Compression::ZSTD(_)
3687 ));
3688
3689 assert!(matches!(
3691 parse_parquet_compression("invalid"),
3692 Compression::SNAPPY
3693 ));
3694 }
3695
3696 #[test]
3697 fn test_append_only_rejects_copy_on_write() {
3698 let values = [
3700 ("connector", "iceberg"),
3701 ("type", "append-only"),
3702 ("warehouse.path", "s3://iceberg"),
3703 ("s3.endpoint", "http://127.0.0.1:9301"),
3704 ("s3.access.key", "test"),
3705 ("s3.secret.key", "test"),
3706 ("s3.region", "us-east-1"),
3707 ("catalog.type", "storage"),
3708 ("catalog.name", "demo"),
3709 ("database.name", "test_db"),
3710 ("table.name", "test_table"),
3711 ("write_mode", "copy-on-write"),
3712 ]
3713 .into_iter()
3714 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3715 .collect();
3716
3717 let result = IcebergConfig::from_btreemap(values);
3718 assert!(result.is_err());
3719 assert!(
3720 result
3721 .unwrap_err()
3722 .to_string()
3723 .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3724 );
3725 }
3726
3727 #[test]
3728 fn test_append_only_accepts_merge_on_read() {
3729 let values = [
3731 ("connector", "iceberg"),
3732 ("type", "append-only"),
3733 ("warehouse.path", "s3://iceberg"),
3734 ("s3.endpoint", "http://127.0.0.1:9301"),
3735 ("s3.access.key", "test"),
3736 ("s3.secret.key", "test"),
3737 ("s3.region", "us-east-1"),
3738 ("catalog.type", "storage"),
3739 ("catalog.name", "demo"),
3740 ("database.name", "test_db"),
3741 ("table.name", "test_table"),
3742 ("write_mode", "merge-on-read"),
3743 ]
3744 .into_iter()
3745 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3746 .collect();
3747
3748 let result = IcebergConfig::from_btreemap(values);
3749 assert!(result.is_ok());
3750 let config = result.unwrap();
3751 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3752 }
3753
3754 #[test]
3755 fn test_append_only_defaults_to_merge_on_read() {
3756 let values = [
3758 ("connector", "iceberg"),
3759 ("type", "append-only"),
3760 ("warehouse.path", "s3://iceberg"),
3761 ("s3.endpoint", "http://127.0.0.1:9301"),
3762 ("s3.access.key", "test"),
3763 ("s3.secret.key", "test"),
3764 ("s3.region", "us-east-1"),
3765 ("catalog.type", "storage"),
3766 ("catalog.name", "demo"),
3767 ("database.name", "test_db"),
3768 ("table.name", "test_table"),
3769 ]
3770 .into_iter()
3771 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3772 .collect();
3773
3774 let result = IcebergConfig::from_btreemap(values);
3775 assert!(result.is_ok());
3776 let config = result.unwrap();
3777 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3778 }
3779
3780 #[test]
3781 fn test_upsert_accepts_copy_on_write() {
3782 let values = [
3784 ("connector", "iceberg"),
3785 ("type", "upsert"),
3786 ("primary_key", "id"),
3787 ("warehouse.path", "s3://iceberg"),
3788 ("s3.endpoint", "http://127.0.0.1:9301"),
3789 ("s3.access.key", "test"),
3790 ("s3.secret.key", "test"),
3791 ("s3.region", "us-east-1"),
3792 ("catalog.type", "storage"),
3793 ("catalog.name", "demo"),
3794 ("database.name", "test_db"),
3795 ("table.name", "test_table"),
3796 ("write_mode", "copy-on-write"),
3797 ]
3798 .into_iter()
3799 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3800 .collect();
3801
3802 let result = IcebergConfig::from_btreemap(values);
3803 assert!(result.is_ok());
3804 let config = result.unwrap();
3805 assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3806 }
3807}